Skip to content

Commit da9d35c

Browse files
author
Alex Zielenski
authored
✨ Add TransformFuncByObject Option for Informer Cache (#1805)
* add new cache.Options field to customize transform * fixup! add new cache.Options field to customize transform * fixup! add new cache.Options field to customize transform flatten arrow code
1 parent eb39b8e commit da9d35c

File tree

5 files changed

+323
-11
lines changed

5 files changed

+323
-11
lines changed

pkg/cache/cache.go

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,18 @@ type Options struct {
128128
// Be very careful with this, when enabled you must DeepCopy any object before mutating it,
129129
// otherwise you will mutate the object in the cache.
130130
UnsafeDisableDeepCopyByObject DisableDeepCopyByObject
131+
132+
// TransformByObject is a map from GVKs to transformer functions which
133+
// get applied when objects of the transformation are about to be committed
134+
// to cache.
135+
//
136+
// This function is called both for new objects to enter the cache,
137+
// and for updated objects.
138+
TransformByObject TransformByObject
139+
140+
// DefaultTransform is the transform used for all GVKs which do
141+
// not have an explicit transform func set in TransformByObject
142+
DefaultTransform toolscache.TransformFunc
131143
}
132144

133145
var defaultResyncTime = 10 * time.Hour
@@ -146,7 +158,12 @@ func New(config *rest.Config, opts Options) (Cache, error) {
146158
if err != nil {
147159
return nil, err
148160
}
149-
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace, selectorsByGVK, disableDeepCopyByGVK)
161+
transformByGVK, err := convertToTransformByKindAndGVK(opts.TransformByObject, opts.DefaultTransform, opts.Scheme)
162+
if err != nil {
163+
return nil, err
164+
}
165+
166+
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace, selectorsByGVK, disableDeepCopyByGVK, transformByGVK)
150167
return &informerCache{InformersMap: im}, nil
151168
}
152169

@@ -241,3 +258,18 @@ func convertToDisableDeepCopyByGVK(disableDeepCopyByObject DisableDeepCopyByObje
241258
}
242259
return disableDeepCopyByGVK, nil
243260
}
261+
262+
// TransformByObject associate a client.Object's GVK to a transformer function
263+
// to be applied when storing the object into the cache.
264+
type TransformByObject map[client.Object]toolscache.TransformFunc
265+
266+
func convertToTransformByKindAndGVK(t TransformByObject, defaultTransform toolscache.TransformFunc, scheme *runtime.Scheme) (internal.TransformFuncByObject, error) {
267+
result := internal.NewTransformFuncByObject()
268+
for obj, transformation := range t {
269+
if err := result.Set(obj, scheme, transformation); err != nil {
270+
return nil, err
271+
}
272+
}
273+
result.SetDefault(defaultTransform)
274+
return result, nil
275+
}

pkg/cache/cache_test.go

Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,12 @@ import (
2929

3030
corev1 "k8s.io/api/core/v1"
3131
apierrors "k8s.io/apimachinery/pkg/api/errors"
32+
"k8s.io/apimachinery/pkg/api/meta"
3233
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3334
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
3435
"k8s.io/apimachinery/pkg/fields"
3536
"k8s.io/apimachinery/pkg/labels"
37+
"k8s.io/apimachinery/pkg/runtime"
3638
"k8s.io/apimachinery/pkg/runtime/schema"
3739
kscheme "k8s.io/client-go/kubernetes/scheme"
3840
"k8s.io/client-go/rest"
@@ -121,6 +123,221 @@ var _ = Describe("Multi-Namespace Informer Cache", func() {
121123
var _ = Describe("Informer Cache without DeepCopy", func() {
122124
CacheTest(cache.New, cache.Options{UnsafeDisableDeepCopyByObject: cache.DisableDeepCopyByObject{cache.ObjectAll{}: true}})
123125
})
126+
127+
var _ = Describe("Cache with transformers", func() {
128+
var (
129+
informerCache cache.Cache
130+
informerCacheCtx context.Context
131+
informerCacheCancel context.CancelFunc
132+
knownPod1 client.Object
133+
knownPod2 client.Object
134+
knownPod3 client.Object
135+
knownPod4 client.Object
136+
knownPod5 client.Object
137+
knownPod6 client.Object
138+
)
139+
140+
getTransformValue := func(obj client.Object) string {
141+
accessor, err := meta.Accessor(obj)
142+
if err == nil {
143+
annotations := accessor.GetAnnotations()
144+
if val, exists := annotations["transformed"]; exists {
145+
return val
146+
}
147+
}
148+
return ""
149+
}
150+
151+
BeforeEach(func() {
152+
informerCacheCtx, informerCacheCancel = context.WithCancel(context.Background())
153+
Expect(cfg).NotTo(BeNil())
154+
155+
By("creating three pods")
156+
cl, err := client.New(cfg, client.Options{})
157+
Expect(err).NotTo(HaveOccurred())
158+
err = ensureNode(testNodeOne, cl)
159+
Expect(err).NotTo(HaveOccurred())
160+
err = ensureNamespace(testNamespaceOne, cl)
161+
Expect(err).NotTo(HaveOccurred())
162+
err = ensureNamespace(testNamespaceTwo, cl)
163+
Expect(err).NotTo(HaveOccurred())
164+
err = ensureNamespace(testNamespaceThree, cl)
165+
Expect(err).NotTo(HaveOccurred())
166+
// Includes restart policy since these objects are indexed on this field.
167+
knownPod1 = createPod("test-pod-1", testNamespaceOne, corev1.RestartPolicyNever)
168+
knownPod2 = createPod("test-pod-2", testNamespaceTwo, corev1.RestartPolicyAlways)
169+
knownPod3 = createPodWithLabels("test-pod-3", testNamespaceTwo, corev1.RestartPolicyOnFailure, map[string]string{"common-label": "common"})
170+
knownPod4 = createPodWithLabels("test-pod-4", testNamespaceThree, corev1.RestartPolicyNever, map[string]string{"common-label": "common"})
171+
knownPod5 = createPod("test-pod-5", testNamespaceOne, corev1.RestartPolicyNever)
172+
knownPod6 = createPod("test-pod-6", testNamespaceTwo, corev1.RestartPolicyAlways)
173+
174+
podGVK := schema.GroupVersionKind{
175+
Kind: "Pod",
176+
Version: "v1",
177+
}
178+
179+
knownPod1.GetObjectKind().SetGroupVersionKind(podGVK)
180+
knownPod2.GetObjectKind().SetGroupVersionKind(podGVK)
181+
knownPod3.GetObjectKind().SetGroupVersionKind(podGVK)
182+
knownPod4.GetObjectKind().SetGroupVersionKind(podGVK)
183+
knownPod5.GetObjectKind().SetGroupVersionKind(podGVK)
184+
knownPod6.GetObjectKind().SetGroupVersionKind(podGVK)
185+
186+
By("creating the informer cache")
187+
informerCache, err = cache.New(cfg, cache.Options{
188+
DefaultTransform: func(i interface{}) (interface{}, error) {
189+
obj := i.(runtime.Object)
190+
Expect(obj).NotTo(BeNil())
191+
192+
accessor, err := meta.Accessor(obj)
193+
Expect(err).To(BeNil())
194+
annotations := accessor.GetAnnotations()
195+
196+
if _, exists := annotations["transformed"]; exists {
197+
// Avoid performing transformation multiple times.
198+
return i, nil
199+
}
200+
201+
if annotations == nil {
202+
annotations = make(map[string]string)
203+
}
204+
annotations["transformed"] = "default"
205+
accessor.SetAnnotations(annotations)
206+
return i, nil
207+
},
208+
TransformByObject: cache.TransformByObject{
209+
&corev1.Pod{}: func(i interface{}) (interface{}, error) {
210+
obj := i.(runtime.Object)
211+
Expect(obj).NotTo(BeNil())
212+
accessor, err := meta.Accessor(obj)
213+
Expect(err).To(BeNil())
214+
215+
annotations := accessor.GetAnnotations()
216+
if _, exists := annotations["transformed"]; exists {
217+
// Avoid performing transformation multiple times.
218+
return i, nil
219+
}
220+
221+
if annotations == nil {
222+
annotations = make(map[string]string)
223+
}
224+
annotations["transformed"] = "explicit"
225+
accessor.SetAnnotations(annotations)
226+
return i, nil
227+
},
228+
},
229+
})
230+
Expect(err).NotTo(HaveOccurred())
231+
By("running the cache and waiting for it to sync")
232+
// pass as an arg so that we don't race between close and re-assign
233+
go func(ctx context.Context) {
234+
defer GinkgoRecover()
235+
Expect(informerCache.Start(ctx)).To(Succeed())
236+
}(informerCacheCtx)
237+
Expect(informerCache.WaitForCacheSync(informerCacheCtx)).To(BeTrue())
238+
})
239+
240+
AfterEach(func() {
241+
By("cleaning up created pods")
242+
deletePod(knownPod1)
243+
deletePod(knownPod2)
244+
deletePod(knownPod3)
245+
deletePod(knownPod4)
246+
deletePod(knownPod5)
247+
deletePod(knownPod6)
248+
249+
informerCacheCancel()
250+
})
251+
252+
Context("with structured objects", func() {
253+
It("should apply transformers to explicitly specified GVKS", func() {
254+
By("listing pods")
255+
out := corev1.PodList{}
256+
Expect(informerCache.List(context.Background(), &out)).To(Succeed())
257+
258+
By("verifying that the returned pods were transformed")
259+
for i := 0; i < len(out.Items); i++ {
260+
Expect(getTransformValue(&out.Items[i])).To(BeIdenticalTo("explicit"))
261+
}
262+
})
263+
264+
It("should apply default transformer to objects when none is specified", func() {
265+
By("getting the Kubernetes service")
266+
svc := &corev1.Service{}
267+
svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"}
268+
Expect(informerCache.Get(context.Background(), svcKey, svc)).To(Succeed())
269+
270+
By("verifying that the returned service was transformed")
271+
Expect(getTransformValue(svc)).To(BeIdenticalTo("default"))
272+
})
273+
})
274+
275+
Context("with unstructured objects", func() {
276+
It("should apply transformers to explicitly specified GVKS", func() {
277+
By("listing pods")
278+
out := unstructured.UnstructuredList{}
279+
out.SetGroupVersionKind(schema.GroupVersionKind{
280+
Group: "",
281+
Version: "v1",
282+
Kind: "PodList",
283+
})
284+
Expect(informerCache.List(context.Background(), &out)).To(Succeed())
285+
286+
By("verifying that the returned pods were transformed")
287+
for i := 0; i < len(out.Items); i++ {
288+
Expect(getTransformValue(&out.Items[i])).To(BeIdenticalTo("explicit"))
289+
}
290+
})
291+
292+
It("should apply default transformer to objects when none is specified", func() {
293+
By("getting the Kubernetes service")
294+
svc := &unstructured.Unstructured{}
295+
svc.SetGroupVersionKind(schema.GroupVersionKind{
296+
Group: "",
297+
Version: "v1",
298+
Kind: "Service",
299+
})
300+
svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"}
301+
Expect(informerCache.Get(context.Background(), svcKey, svc)).To(Succeed())
302+
303+
By("verifying that the returned service was transformed")
304+
Expect(getTransformValue(svc)).To(BeIdenticalTo("default"))
305+
})
306+
})
307+
308+
Context("with metadata-only objects", func() {
309+
It("should apply transformers to explicitly specified GVKS", func() {
310+
By("listing pods")
311+
out := metav1.PartialObjectMetadataList{}
312+
out.SetGroupVersionKind(schema.GroupVersionKind{
313+
Group: "",
314+
Version: "v1",
315+
Kind: "PodList",
316+
})
317+
Expect(informerCache.List(context.Background(), &out)).To(Succeed())
318+
319+
By("verifying that the returned pods were transformed")
320+
for i := 0; i < len(out.Items); i++ {
321+
Expect(getTransformValue(&out.Items[i])).To(BeIdenticalTo("explicit"))
322+
}
323+
})
324+
It("should apply default transformer to objects when none is specified", func() {
325+
By("getting the Kubernetes service")
326+
svc := &metav1.PartialObjectMetadata{}
327+
svc.SetGroupVersionKind(schema.GroupVersionKind{
328+
Group: "",
329+
Version: "v1",
330+
Kind: "Service",
331+
})
332+
svcKey := client.ObjectKey{Namespace: "default", Name: "kubernetes"}
333+
Expect(informerCache.Get(context.Background(), svcKey, svc)).To(Succeed())
334+
335+
By("verifying that the returned service was transformed")
336+
Expect(getTransformValue(svc)).To(BeIdenticalTo("default"))
337+
})
338+
})
339+
})
340+
124341
var _ = Describe("Cache with selectors", func() {
125342
defer GinkgoRecover()
126343
var (

pkg/cache/internal/deleg_map.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,12 @@ func NewInformersMap(config *rest.Config,
5252
namespace string,
5353
selectors SelectorsByGVK,
5454
disableDeepCopy DisableDeepCopyByGVK,
55+
transformers TransformFuncByObject,
5556
) *InformersMap {
5657
return &InformersMap{
57-
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),
58-
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),
59-
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),
58+
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers),
59+
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers),
60+
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers),
6061

6162
Scheme: scheme,
6263
}
@@ -108,18 +109,18 @@ func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj
108109

109110
// newStructuredInformersMap creates a new InformersMap for structured objects.
110111
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
111-
namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK) *specificInformersMap {
112-
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, createStructuredListWatch)
112+
namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, transformers TransformFuncByObject) *specificInformersMap {
113+
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers, createStructuredListWatch)
113114
}
114115

115116
// newUnstructuredInformersMap creates a new InformersMap for unstructured objects.
116117
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
117-
namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK) *specificInformersMap {
118-
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, createUnstructuredListWatch)
118+
namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, transformers TransformFuncByObject) *specificInformersMap {
119+
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers, createUnstructuredListWatch)
119120
}
120121

121122
// newMetadataInformersMap creates a new InformersMap for metadata-only objects.
122123
func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
123-
namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK) *specificInformersMap {
124-
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, createMetadataListWatch)
124+
namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, transformers TransformFuncByObject) *specificInformersMap {
125+
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, transformers, createMetadataListWatch)
125126
}

pkg/cache/internal/informers_map.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ func newSpecificInformersMap(config *rest.Config,
5454
namespace string,
5555
selectors SelectorsByGVK,
5656
disableDeepCopy DisableDeepCopyByGVK,
57-
createListWatcher createListWatcherFunc) *specificInformersMap {
57+
transformers TransformFuncByObject,
58+
createListWatcher createListWatcherFunc,
59+
) *specificInformersMap {
5860
ip := &specificInformersMap{
5961
config: config,
6062
Scheme: scheme,
@@ -68,6 +70,7 @@ func newSpecificInformersMap(config *rest.Config,
6870
namespace: namespace,
6971
selectors: selectors.forGVK,
7072
disableDeepCopy: disableDeepCopy,
73+
transformers: transformers,
7174
}
7275
return ip
7376
}
@@ -135,6 +138,9 @@ type specificInformersMap struct {
135138

136139
// disableDeepCopy indicates not to deep copy objects during get or list objects.
137140
disableDeepCopy DisableDeepCopyByGVK
141+
142+
// transform funcs are applied to objects before they are committed to the cache
143+
transformers TransformFuncByObject
138144
}
139145

140146
// Start calls Run on each of the informers and sets started to true. Blocks on the context.
@@ -227,6 +233,12 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob
227233
ni := cache.NewSharedIndexInformer(lw, obj, resyncPeriod(ip.resync)(), cache.Indexers{
228234
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
229235
})
236+
237+
// Check to see if there is a transformer for this gvk
238+
if err := ni.SetTransform(ip.transformers.Get(gvk)); err != nil {
239+
return nil, false, err
240+
}
241+
230242
rm, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
231243
if err != nil {
232244
return nil, false, err

0 commit comments

Comments
 (0)