Skip to content

Commit efbc79c

Browse files
committed
Improve UX for using client-go generated Informers
1 parent 161208a commit efbc79c

File tree

4 files changed

+67
-8
lines changed

4 files changed

+67
-8
lines changed

pkg/manager/manager.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,14 @@ func (r RunnableFunc) Start(s <-chan struct{}) error {
135135
return r(s)
136136
}
137137

138+
// InformerFactoryRunnable wraps a SharedInformerFactory to make it implement Runnable
139+
func StartAdapter(s func(<-chan struct{})) Runnable {
140+
return RunnableFunc(func(c <-chan struct{}) error {
141+
s(c)
142+
return nil
143+
})
144+
}
145+
138146
// New returns a new Manager for creating Controllers.
139147
func New(config *rest.Config, options Options) (Manager, error) {
140148
// Initialize a rest.config if none was specified

pkg/source/example_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,22 @@ limitations under the License.
1717
package source_test
1818

1919
import (
20+
"time"
21+
22+
"github.com/golang/glog"
2023
"k8s.io/api/core/v1"
24+
kubeinformers "k8s.io/client-go/informers"
25+
"k8s.io/client-go/kubernetes"
2126
"sigs.k8s.io/controller-runtime/pkg/controller"
2227
"sigs.k8s.io/controller-runtime/pkg/event"
2328
"sigs.k8s.io/controller-runtime/pkg/handler"
29+
"sigs.k8s.io/controller-runtime/pkg/manager"
30+
"sigs.k8s.io/controller-runtime/pkg/runtime/signals"
2431
"sigs.k8s.io/controller-runtime/pkg/source"
2532
)
2633

2734
var ctrl controller.Controller
35+
var mgr manager.Manager
2836

2937
// This example Watches for Pod Events (e.g. Create / Update / Delete) and enqueues a reconcile.Request
3038
// with the Name and Namespace of the Pod.
@@ -42,3 +50,27 @@ func ExampleChannel() {
4250
&handler.EnqueueRequestForObject{},
4351
)
4452
}
53+
54+
// This example Watches for Service Events (e.g. Create / Update / Delete) and enqueues a reconcile.Request
55+
// with the Name and Namespace of the Service. It uses the client-go generated Service Informer instead of the
56+
// Generic Informer.
57+
func ExampleInformer() {
58+
generatedClient := kubernetes.NewForConfigOrDie(mgr.GetConfig())
59+
generatedInformers := kubeinformers.NewSharedInformerFactory(generatedClient, time.Minute*30)
60+
61+
// Add it to the Manager
62+
if err := mgr.Add(manager.StartAdapter(generatedInformers.Start)); err != nil {
63+
glog.Fatalf("error Adding InformerFactory to the Manager: %v", err)
64+
}
65+
66+
// Setup Watch using the client-go generated Informer
67+
if err := ctrl.Watch(
68+
&source.Informer{InformerProvider: generatedInformers.Core().V1().Services()},
69+
&handler.EnqueueRequestForObject{},
70+
); err != nil {
71+
glog.Fatalf("error Watching Services: %v", err)
72+
}
73+
74+
// Start the Manager
75+
mgr.Start(signals.SetupSignalHandler())
76+
}

pkg/source/source.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -241,10 +241,17 @@ func (cs *Channel) syncLoop() {
241241
}
242242
}
243243

244+
type InformerProvider interface {
245+
Informer() toolscache.SharedIndexInformer
246+
}
247+
244248
// Informer is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create)
245249
type Informer struct {
246-
// Informer is the generated client-go Informer
250+
// Informer is the generated client-go Informer. Mutually exclusive with InformerProvider.
247251
Informer toolscache.SharedIndexInformer
252+
253+
// InformerProvider provides a generated client-go Informer. Mutually exclusive with Informer.
254+
InformerProvider InformerProvider
248255
}
249256

250257
var _ Source = &Informer{}
@@ -255,11 +262,23 @@ func (is *Informer) Start(handler handler.EventHandler, queue workqueue.RateLimi
255262
prct ...predicate.Predicate) error {
256263

257264
// Informer should have been specified by the user.
258-
if is.Informer == nil {
259-
return fmt.Errorf("must specify Informer.Informer")
265+
if is.Informer == nil && is.InformerProvider == nil {
266+
return fmt.Errorf("must specify Informer.Informer or Informer.InformerProvider")
267+
}
268+
269+
if is.Informer != nil && is.InformerProvider != nil {
270+
return fmt.Errorf("must specify only one of Informer.Informer and Informer.InformerProvider")
271+
}
272+
273+
if is.Informer != nil {
274+
is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
275+
}
276+
277+
if is.InformerProvider != nil {
278+
is.InformerProvider.Informer().AddEventHandler(
279+
internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
260280
}
261281

262-
is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
263282
return nil
264283
}
265284

pkg/source/source_suite_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func TestSource(t *testing.T) {
3434
}
3535

3636
var testenv *envtest.Environment
37-
var config *rest.Config
37+
var cfg *rest.Config
3838
var clientset *kubernetes.Clientset
3939
var icache cache.Cache
4040
var stop chan struct{}
@@ -46,13 +46,13 @@ var _ = BeforeSuite(func(done Done) {
4646
testenv = &envtest.Environment{}
4747

4848
var err error
49-
config, err = testenv.Start()
49+
cfg, err = testenv.Start()
5050
Expect(err).NotTo(HaveOccurred())
5151

52-
clientset, err = kubernetes.NewForConfig(config)
52+
clientset, err = kubernetes.NewForConfig(cfg)
5353
Expect(err).NotTo(HaveOccurred())
5454

55-
icache, err = cache.New(config, cache.Options{})
55+
icache, err = cache.New(cfg, cache.Options{})
5656
Expect(err).NotTo(HaveOccurred())
5757

5858
go func() {

0 commit comments

Comments
 (0)