Skip to content

Commit c893a79

Browse files
authored
Merge pull request kubernetes-sigs#159 from pwittrock/master
Support for client-go code generated controllers as Controller Watch …
2 parents b8b3c36 + ad7e2ab commit c893a79

File tree

3 files changed

+191
-8
lines changed

3 files changed

+191
-8
lines changed

pkg/source/source.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
2828
"sigs.k8s.io/controller-runtime/pkg/source/internal"
2929

30+
toolscache "k8s.io/client-go/tools/cache"
3031
"sigs.k8s.io/controller-runtime/pkg/cache"
3132
"sigs.k8s.io/controller-runtime/pkg/predicate"
3233
)
@@ -39,7 +40,7 @@ const (
3940
// Source is a source of events (eh.g. Create, Update, Delete operations on Kubernetes Objects, Webhook callbacks, etc)
4041
// which should be processed by event.EventHandlers to enqueue reconcile.Requests.
4142
//
42-
// * Use Kind for events originating in the cluster (eh.g. Pod Create, Pod Update, Deployment Update).
43+
// * Use Kind for events originating in the cluster (e.g. Pod Create, Pod Update, Deployment Update).
4344
//
4445
// * Use Channel for events originating outside the cluster (eh.g. GitHub Webhook callback, Polling external urls).
4546
//
@@ -51,7 +52,7 @@ type Source interface {
5152
Start(handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
5253
}
5354

54-
// Kind is used to provide a source of events originating inside the cluster from Watches (eh.g. Pod Create)
55+
// Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create)
5556
type Kind struct {
5657
// Type is the type of object to watch. e.g. &v1.Pod{}
5758
Type runtime.Object
@@ -100,7 +101,7 @@ func (ks *Kind) InjectCache(c cache.Cache) error {
100101
var _ Source = &Channel{}
101102

102103
// Channel is used to provide a source of events originating outside the cluster
103-
// (eh.g. GitHub Webhook callback). Channel requires the user to wire the external
104+
// (e.g. GitHub Webhook callback). Channel requires the user to wire the external
104105
// source (eh.g. http handler) to write GenericEvents to the underlying channel.
105106
type Channel struct {
106107
// once ensures the event distribution goroutine will be performed only once
@@ -221,6 +222,28 @@ func (cs *Channel) syncLoop() {
221222
}
222223
}
223224

225+
// Informer is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create)
226+
type Informer struct {
227+
// Informer is the generated client-go Informer
228+
Informer toolscache.SharedIndexInformer
229+
}
230+
231+
var _ Source = &Informer{}
232+
233+
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
234+
// to enqueue reconcile.Requests.
235+
func (ks *Informer) Start(handler handler.EventHandler, queue workqueue.RateLimitingInterface,
236+
prct ...predicate.Predicate) error {
237+
238+
// Informer should have been specified by the user.
239+
if ks.Informer == nil {
240+
return fmt.Errorf("must specify Informer.Informer")
241+
}
242+
243+
ks.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
244+
return nil
245+
}
246+
224247
// Func is a function that implements Source
225248
type Func func(handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
226249

pkg/source/source_integration_test.go

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package source_test
1818

1919
import (
2020
"fmt"
21+
"time"
2122

2223
"sigs.k8s.io/controller-runtime/pkg/event"
2324
"sigs.k8s.io/controller-runtime/pkg/handler"
@@ -30,6 +31,8 @@ import (
3031
corev1 "k8s.io/api/core/v1"
3132
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3233
"k8s.io/apimachinery/pkg/runtime"
34+
kubeinformers "k8s.io/client-go/informers"
35+
toolscache "k8s.io/client-go/tools/cache"
3336
"k8s.io/client-go/util/workqueue"
3437
)
3538

@@ -209,4 +212,161 @@ var _ = Describe("Source", func() {
209212
})
210213
})
211214
})
215+
216+
Describe("Informer", func() {
217+
var c chan struct{}
218+
var rs *appsv1.ReplicaSet
219+
var depInformer toolscache.SharedIndexInformer
220+
var informerFactory kubeinformers.SharedInformerFactory
221+
var stopTest chan struct{}
222+
223+
BeforeEach(func(done Done) {
224+
stopTest = make(chan struct{})
225+
informerFactory = kubeinformers.NewSharedInformerFactory(clientset, time.Second*30)
226+
depInformer = informerFactory.Apps().V1().ReplicaSets().Informer()
227+
informerFactory.Start(stopTest)
228+
Eventually(depInformer.HasSynced).Should(BeTrue())
229+
230+
c = make(chan struct{})
231+
rs = &appsv1.ReplicaSet{
232+
ObjectMeta: metav1.ObjectMeta{Name: "informer-rs-name"},
233+
Spec: appsv1.ReplicaSetSpec{
234+
Selector: &metav1.LabelSelector{
235+
MatchLabels: map[string]string{"foo": "bar"},
236+
},
237+
Template: corev1.PodTemplateSpec{
238+
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}},
239+
Spec: corev1.PodSpec{
240+
Containers: []corev1.Container{
241+
{
242+
Name: "nginx",
243+
Image: "nginx",
244+
},
245+
},
246+
},
247+
},
248+
},
249+
}
250+
close(done)
251+
})
252+
253+
AfterEach(func(done Done) {
254+
close(stopTest)
255+
close(done)
256+
})
257+
258+
Context("for a ReplicaSet resource", func() {
259+
It("should provide a ReplicaSet CreateEvent", func(done Done) {
260+
c := make(chan struct{})
261+
262+
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
263+
instance := &source.Informer{Informer: depInformer}
264+
err := instance.Start(handler.Funcs{
265+
CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) {
266+
defer GinkgoRecover()
267+
var err error
268+
rs, err = clientset.AppsV1().ReplicaSets("default").Get(rs.Name, metav1.GetOptions{})
269+
Expect(err).NotTo(HaveOccurred())
270+
271+
Expect(q2).To(BeIdenticalTo(q))
272+
Expect(evt.Meta).To(Equal(rs))
273+
Expect(evt.Object).To(Equal(rs))
274+
close(c)
275+
},
276+
UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) {
277+
defer GinkgoRecover()
278+
Fail("Unexpected UpdateEvent")
279+
},
280+
DeleteFunc: func(event.DeleteEvent, workqueue.RateLimitingInterface) {
281+
defer GinkgoRecover()
282+
Fail("Unexpected DeleteEvent")
283+
},
284+
GenericFunc: func(event.GenericEvent, workqueue.RateLimitingInterface) {
285+
defer GinkgoRecover()
286+
Fail("Unexpected GenericEvent")
287+
},
288+
}, q)
289+
Expect(err).NotTo(HaveOccurred())
290+
291+
rs, err = clientset.AppsV1().ReplicaSets("default").Create(rs)
292+
Expect(err).NotTo(HaveOccurred())
293+
<-c
294+
close(done)
295+
}, 30)
296+
297+
It("should provide a ReplicaSet UpdateEvent", func(done Done) {
298+
var err error
299+
rs, err = clientset.AppsV1().ReplicaSets("default").Get(rs.Name, metav1.GetOptions{})
300+
Expect(err).NotTo(HaveOccurred())
301+
302+
rs2 := rs.DeepCopy()
303+
rs2.SetLabels(map[string]string{"biz": "baz"})
304+
305+
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
306+
instance := &source.Informer{Informer: depInformer}
307+
err = instance.Start(handler.Funcs{
308+
CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) {
309+
},
310+
UpdateFunc: func(evt event.UpdateEvent, q2 workqueue.RateLimitingInterface) {
311+
defer GinkgoRecover()
312+
var err error
313+
rs2, err = clientset.AppsV1().ReplicaSets("default").Get(rs.Name, metav1.GetOptions{})
314+
Expect(err).NotTo(HaveOccurred())
315+
316+
Expect(q2).To(Equal(q))
317+
Expect(evt.MetaOld).To(Equal(rs))
318+
Expect(evt.ObjectOld).To(Equal(rs))
319+
320+
Expect(evt.MetaNew).To(Equal(rs2))
321+
Expect(evt.ObjectNew).To(Equal(rs2))
322+
323+
close(c)
324+
},
325+
DeleteFunc: func(event.DeleteEvent, workqueue.RateLimitingInterface) {
326+
defer GinkgoRecover()
327+
Fail("Unexpected DeleteEvent")
328+
},
329+
GenericFunc: func(event.GenericEvent, workqueue.RateLimitingInterface) {
330+
defer GinkgoRecover()
331+
Fail("Unexpected GenericEvent")
332+
},
333+
}, q)
334+
Expect(err).NotTo(HaveOccurred())
335+
336+
rs2, err = clientset.AppsV1().ReplicaSets("default").Update(rs2)
337+
Expect(err).NotTo(HaveOccurred())
338+
<-c
339+
close(done)
340+
})
341+
342+
It("should provide a ReplicaSet DeletedEvent", func(done Done) {
343+
c := make(chan struct{})
344+
345+
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
346+
instance := &source.Informer{Informer: depInformer}
347+
err := instance.Start(handler.Funcs{
348+
CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) {
349+
},
350+
UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) {
351+
},
352+
DeleteFunc: func(evt event.DeleteEvent, q2 workqueue.RateLimitingInterface) {
353+
defer GinkgoRecover()
354+
Expect(q2).To(Equal(q))
355+
Expect(evt.Meta.GetName()).To(Equal(rs.Name))
356+
close(c)
357+
},
358+
GenericFunc: func(event.GenericEvent, workqueue.RateLimitingInterface) {
359+
defer GinkgoRecover()
360+
Fail("Unexpected GenericEvent")
361+
},
362+
}, q)
363+
Expect(err).NotTo(HaveOccurred())
364+
365+
err = clientset.AppsV1().ReplicaSets("default").Delete(rs.Name, &metav1.DeleteOptions{})
366+
Expect(err).NotTo(HaveOccurred())
367+
<-c
368+
close(done)
369+
})
370+
})
371+
})
212372
})

pkg/source/source_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ var _ = Describe("Source", func() {
116116
},
117117
UpdateFunc: func(evt event.UpdateEvent, q2 workqueue.RateLimitingInterface) {
118118
defer GinkgoRecover()
119-
Expect(q2).To(Equal(q))
119+
Expect(q2).To(BeIdenticalTo(q))
120120
Expect(evt.MetaOld).To(Equal(p))
121121
Expect(evt.ObjectOld).To(Equal(p))
122122

@@ -170,7 +170,7 @@ var _ = Describe("Source", func() {
170170
},
171171
DeleteFunc: func(evt event.DeleteEvent, q2 workqueue.RateLimitingInterface) {
172172
defer GinkgoRecover()
173-
Expect(q2).To(Equal(q))
173+
Expect(q2).To(BeIdenticalTo(q))
174174
Expect(evt.Meta).To(Equal(p))
175175
Expect(evt.Object).To(Equal(p))
176176
close(c)
@@ -306,7 +306,7 @@ var _ = Describe("Source", func() {
306306
defer GinkgoRecover()
307307
// The empty event should have been filtered out by the predicates,
308308
// and will not be passed to the handler.
309-
Expect(q2).To(Equal(q))
309+
Expect(q2).To(BeIdenticalTo(q))
310310
Expect(evt.Meta).To(Equal(p))
311311
Expect(evt.Object).To(Equal(p))
312312
close(c)
@@ -429,7 +429,7 @@ var _ = Describe("Source", func() {
429429
},
430430
GenericFunc: func(evt event.GenericEvent, q2 workqueue.RateLimitingInterface) {
431431
defer GinkgoRecover()
432-
Expect(q2).To(Equal(q))
432+
Expect(q2).To(BeIdenticalTo(q))
433433
Expect(evt.Meta).To(Equal(p))
434434
Expect(evt.Object).To(Equal(p))
435435
resEvent1 = evt
@@ -453,7 +453,7 @@ var _ = Describe("Source", func() {
453453
},
454454
GenericFunc: func(evt event.GenericEvent, q2 workqueue.RateLimitingInterface) {
455455
defer GinkgoRecover()
456-
Expect(q2).To(Equal(q))
456+
Expect(q2).To(BeIdenticalTo(q))
457457
Expect(evt.Meta).To(Equal(p))
458458
Expect(evt.Object).To(Equal(p))
459459
resEvent2 = evt

0 commit comments

Comments
 (0)