Skip to content

Support for client-go code generated controllers as Controller Watch … #159

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions pkg/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
"sigs.k8s.io/controller-runtime/pkg/source/internal"

toolscache "k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
Expand All @@ -39,7 +40,7 @@ const (
// Source is a source of events (eh.g. Create, Update, Delete operations on Kubernetes Objects, Webhook callbacks, etc)
// which should be processed by event.EventHandlers to enqueue reconcile.Requests.
//
// * Use Kind for events originating in the cluster (eh.g. Pod Create, Pod Update, Deployment Update).
// * Use Kind for events originating in the cluster (e.g. Pod Create, Pod Update, Deployment Update).
//
// * Use Channel for events originating outside the cluster (eh.g. GitHub Webhook callback, Polling external urls).
//
Expand All @@ -51,7 +52,7 @@ type Source interface {
Start(handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
}

// Kind is used to provide a source of events originating inside the cluster from Watches (eh.g. Pod Create)
// Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create)
type Kind struct {
// Type is the type of object to watch. e.g. &v1.Pod{}
Type runtime.Object
Expand Down Expand Up @@ -100,7 +101,7 @@ func (ks *Kind) InjectCache(c cache.Cache) error {
var _ Source = &Channel{}

// Channel is used to provide a source of events originating outside the cluster
// (eh.g. GitHub Webhook callback). Channel requires the user to wire the external
// (e.g. GitHub Webhook callback). Channel requires the user to wire the external
// source (eh.g. http handler) to write GenericEvents to the underlying channel.
type Channel struct {
// once ensures the event distribution goroutine will be performed only once
Expand Down Expand Up @@ -221,6 +222,28 @@ func (cs *Channel) syncLoop() {
}
}

// Informer is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create)
type Informer struct {
// Informer is the generated client-go Informer
Informer toolscache.SharedIndexInformer
}

var _ Source = &Informer{}

// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
func (ks *Informer) Start(handler handler.EventHandler, queue workqueue.RateLimitingInterface,
prct ...predicate.Predicate) error {

// Informer should have been specified by the user.
if ks.Informer == nil {
return fmt.Errorf("must specify Informer.Informer")
}

ks.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
return nil
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking more about the code in the manager's main fn, I am wondering if we need to add the concept of Source being ready (in this case, informed being sync'ed) from the perspective of wiring everything. Will have to look at the code to reason more about it.

// Func is a function that implements Source
type Func func(handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error

Expand Down
160 changes: 160 additions & 0 deletions pkg/source/source_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package source_test

import (
"fmt"
"time"

"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand All @@ -30,6 +31,8 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
kubeinformers "k8s.io/client-go/informers"
toolscache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)

Expand Down Expand Up @@ -209,4 +212,161 @@ var _ = Describe("Source", func() {
})
})
})

Describe("Informer", func() {
var c chan struct{}
var rs *appsv1.ReplicaSet
var depInformer toolscache.SharedIndexInformer
var informerFactory kubeinformers.SharedInformerFactory
var stopTest chan struct{}

BeforeEach(func(done Done) {
stopTest = make(chan struct{})
informerFactory = kubeinformers.NewSharedInformerFactory(clientset, time.Second*30)
depInformer = informerFactory.Apps().V1().ReplicaSets().Informer()
informerFactory.Start(stopTest)
Eventually(depInformer.HasSynced).Should(BeTrue())

c = make(chan struct{})
rs = &appsv1.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{Name: "informer-rs-name"},
Spec: appsv1.ReplicaSetSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"foo": "bar"},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx",
},
},
},
},
},
}
close(done)
})

AfterEach(func(done Done) {
close(stopTest)
close(done)
})

Context("for a ReplicaSet resource", func() {
It("should provide a ReplicaSet CreateEvent", func(done Done) {
c := make(chan struct{})

q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
instance := &source.Informer{Informer: depInformer}
err := instance.Start(handler.Funcs{
CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) {
defer GinkgoRecover()
var err error
rs, err = clientset.AppsV1().ReplicaSets("default").Get(rs.Name, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())

Expect(q2).To(BeIdenticalTo(q))
Expect(evt.Meta).To(Equal(rs))
Expect(evt.Object).To(Equal(rs))
close(c)
},
UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) {
defer GinkgoRecover()
Fail("Unexpected UpdateEvent")
},
DeleteFunc: func(event.DeleteEvent, workqueue.RateLimitingInterface) {
defer GinkgoRecover()
Fail("Unexpected DeleteEvent")
},
GenericFunc: func(event.GenericEvent, workqueue.RateLimitingInterface) {
defer GinkgoRecover()
Fail("Unexpected GenericEvent")
},
}, q)
Expect(err).NotTo(HaveOccurred())

rs, err = clientset.AppsV1().ReplicaSets("default").Create(rs)
Expect(err).NotTo(HaveOccurred())
<-c
close(done)
}, 30)

It("should provide a ReplicaSet UpdateEvent", func(done Done) {
var err error
rs, err = clientset.AppsV1().ReplicaSets("default").Get(rs.Name, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())

rs2 := rs.DeepCopy()
rs2.SetLabels(map[string]string{"biz": "baz"})

q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
instance := &source.Informer{Informer: depInformer}
err = instance.Start(handler.Funcs{
CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) {
},
UpdateFunc: func(evt event.UpdateEvent, q2 workqueue.RateLimitingInterface) {
defer GinkgoRecover()
var err error
rs2, err = clientset.AppsV1().ReplicaSets("default").Get(rs.Name, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())

Expect(q2).To(Equal(q))
Expect(evt.MetaOld).To(Equal(rs))
Expect(evt.ObjectOld).To(Equal(rs))

Expect(evt.MetaNew).To(Equal(rs2))
Expect(evt.ObjectNew).To(Equal(rs2))

close(c)
},
DeleteFunc: func(event.DeleteEvent, workqueue.RateLimitingInterface) {
defer GinkgoRecover()
Fail("Unexpected DeleteEvent")
},
GenericFunc: func(event.GenericEvent, workqueue.RateLimitingInterface) {
defer GinkgoRecover()
Fail("Unexpected GenericEvent")
},
}, q)
Expect(err).NotTo(HaveOccurred())

rs2, err = clientset.AppsV1().ReplicaSets("default").Update(rs2)
Expect(err).NotTo(HaveOccurred())
<-c
close(done)
})

It("should provide a ReplicaSet DeletedEvent", func(done Done) {
c := make(chan struct{})

q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
instance := &source.Informer{Informer: depInformer}
err := instance.Start(handler.Funcs{
CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) {
},
UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) {
},
DeleteFunc: func(evt event.DeleteEvent, q2 workqueue.RateLimitingInterface) {
defer GinkgoRecover()
Expect(q2).To(Equal(q))
Expect(evt.Meta.GetName()).To(Equal(rs.Name))
close(c)
},
GenericFunc: func(event.GenericEvent, workqueue.RateLimitingInterface) {
defer GinkgoRecover()
Fail("Unexpected GenericEvent")
},
}, q)
Expect(err).NotTo(HaveOccurred())

err = clientset.AppsV1().ReplicaSets("default").Delete(rs.Name, &metav1.DeleteOptions{})
Expect(err).NotTo(HaveOccurred())
<-c
close(done)
})
})
})
})
10 changes: 5 additions & 5 deletions pkg/source/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ var _ = Describe("Source", func() {
},
UpdateFunc: func(evt event.UpdateEvent, q2 workqueue.RateLimitingInterface) {
defer GinkgoRecover()
Expect(q2).To(Equal(q))
Expect(q2).To(BeIdenticalTo(q))
Expect(evt.MetaOld).To(Equal(p))
Expect(evt.ObjectOld).To(Equal(p))

Expand Down Expand Up @@ -170,7 +170,7 @@ var _ = Describe("Source", func() {
},
DeleteFunc: func(evt event.DeleteEvent, q2 workqueue.RateLimitingInterface) {
defer GinkgoRecover()
Expect(q2).To(Equal(q))
Expect(q2).To(BeIdenticalTo(q))
Expect(evt.Meta).To(Equal(p))
Expect(evt.Object).To(Equal(p))
close(c)
Expand Down Expand Up @@ -306,7 +306,7 @@ var _ = Describe("Source", func() {
defer GinkgoRecover()
// The empty event should have been filtered out by the predicates,
// and will not be passed to the handler.
Expect(q2).To(Equal(q))
Expect(q2).To(BeIdenticalTo(q))
Expect(evt.Meta).To(Equal(p))
Expect(evt.Object).To(Equal(p))
close(c)
Expand Down Expand Up @@ -429,7 +429,7 @@ var _ = Describe("Source", func() {
},
GenericFunc: func(evt event.GenericEvent, q2 workqueue.RateLimitingInterface) {
defer GinkgoRecover()
Expect(q2).To(Equal(q))
Expect(q2).To(BeIdenticalTo(q))
Expect(evt.Meta).To(Equal(p))
Expect(evt.Object).To(Equal(p))
resEvent1 = evt
Expand All @@ -453,7 +453,7 @@ var _ = Describe("Source", func() {
},
GenericFunc: func(evt event.GenericEvent, q2 workqueue.RateLimitingInterface) {
defer GinkgoRecover()
Expect(q2).To(Equal(q))
Expect(q2).To(BeIdenticalTo(q))
Expect(evt.Meta).To(Equal(p))
Expect(evt.Object).To(Equal(p))
resEvent2 = evt
Expand Down