Skip to content

Commit fb97e26

Browse files
committed
implement InjectRecorder
1 parent 5a1df6f commit fb97e26

File tree

7 files changed

+149
-2
lines changed

7 files changed

+149
-2
lines changed

pkg/client/config/config.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,12 +102,22 @@ func GetKubernetesClientSet() (*kubernetes.Clientset, error) {
102102
// in cluster and use the cluster provided kubeconfig.
103103
//
104104
// Will log.Fatal if KubernetesInformers cannot be created
105-
func GetKubernetesClientSetOrDie() (*kubernetes.Clientset, error) {
105+
func GetKubernetesClientSetOrDie() *kubernetes.Clientset {
106106
cs, err := GetKubernetesClientSet()
107107
if err != nil {
108108
log.Fatalf("%v", err)
109109
}
110-
return cs, nil
110+
return cs
111+
}
112+
113+
// GetKubernetesClientSetOrDieWithConfig creates a *kubernetes.ClientSet with given config,
114+
// for talking to a Kubernetes apiserver.
115+
func GetKubernetesClientSetOrDieWithConfig(config *rest.Config) *kubernetes.Clientset {
116+
cs, err := kubernetes.NewForConfig(config)
117+
if err != nil {
118+
log.Fatalf("%v", err)
119+
}
120+
return cs
111121
}
112122

113123
// GetKubernetesInformers creates a informers.SharedInformerFactory for talking to a Kubernetes apiserver.

pkg/controller/controller.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,11 @@ func New(name string, mrg manager.Manager, options Options) (Controller, error)
8585
Name: name,
8686
}
8787

88+
// Inject dependencies into the controller.
89+
if err := mrg.SetFields(c); err != nil {
90+
return nil, err
91+
}
92+
8893
// Add the controller as a Manager components
8994
return c, mrg.Add(c)
9095
}

pkg/internal/controller/controller.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/kubernetes-sigs/controller-runtime/pkg/handler"
2727
"github.com/kubernetes-sigs/controller-runtime/pkg/predicate"
2828
"github.com/kubernetes-sigs/controller-runtime/pkg/reconcile"
29+
"github.com/kubernetes-sigs/controller-runtime/pkg/recorder"
2930
"github.com/kubernetes-sigs/controller-runtime/pkg/runtime/inject"
3031
logf "github.com/kubernetes-sigs/controller-runtime/pkg/runtime/log"
3132
"github.com/kubernetes-sigs/controller-runtime/pkg/source"
@@ -34,6 +35,7 @@ import (
3435
"k8s.io/apimachinery/pkg/util/wait"
3536
"k8s.io/client-go/rest"
3637
toolscache "k8s.io/client-go/tools/cache"
38+
"k8s.io/client-go/tools/record"
3739
"k8s.io/client-go/util/workqueue"
3840
)
3941

@@ -87,6 +89,10 @@ type Controller struct {
8789
// Started is true if the Controller has been Started
8890
Started bool
8991

92+
// recorder is an event recorder for recording Event resources to the
93+
// Kubernetes API.
94+
recorder record.EventRecorder
95+
9096
// TODO(community): Consider initializing a logger with the Controller Name as the tag
9197
}
9298

@@ -234,3 +240,21 @@ func (c *Controller) InjectFunc(f inject.Func) error {
234240
c.SetFields = f
235241
return nil
236242
}
243+
244+
var _ inject.Recorder = &Controller{}
245+
246+
// InjectRecorder is internal should be called only by the Controller.
247+
// InjectRecorder is used to inject the event recorder.
248+
func (c *Controller) InjectRecorder(provider recorder.Provider) error {
249+
if c.recorder != nil {
250+
return nil
251+
}
252+
253+
eventRecorder, err := provider.GetEventRecorderFor(c.Name)
254+
if err != nil {
255+
return err
256+
}
257+
c.recorder = eventRecorder
258+
259+
return nil
260+
}

pkg/manager/internal.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
"github.com/kubernetes-sigs/controller-runtime/pkg/cache"
2323
"github.com/kubernetes-sigs/controller-runtime/pkg/client"
24+
"github.com/kubernetes-sigs/controller-runtime/pkg/recorder"
2425
"github.com/kubernetes-sigs/controller-runtime/pkg/runtime/inject"
2526
logf "github.com/kubernetes-sigs/controller-runtime/pkg/runtime/log"
2627
"k8s.io/apimachinery/pkg/runtime"
@@ -50,6 +51,10 @@ type controllerManager struct {
5051
// which can later be consumed via field selectors from the injected client.
5152
fieldIndexes client.FieldIndexer
5253

54+
// recorderProvider is used to generate event recorders that will be injected into Controllers
55+
// (and EventHandlers, Sources and Predicates).
56+
recorderProvider recorder.Provider
57+
5358
mu sync.Mutex
5459
started bool
5560
errChan chan error
@@ -93,6 +98,9 @@ func (cm *controllerManager) SetFields(i interface{}) error {
9398
if _, err := inject.CacheInto(cm.cache, i); err != nil {
9499
return err
95100
}
101+
if _, err := inject.RecorderInto(cm.recorderProvider, i); err != nil {
102+
return err
103+
}
96104
if _, err := inject.InjectorInto(cm.SetFields, i); err != nil {
97105
return err
98106
}

pkg/manager/manager.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"github.com/kubernetes-sigs/controller-runtime/pkg/cache"
2323
"github.com/kubernetes-sigs/controller-runtime/pkg/client"
2424
"github.com/kubernetes-sigs/controller-runtime/pkg/client/apiutil"
25+
clientconfig "github.com/kubernetes-sigs/controller-runtime/pkg/client/config"
26+
"github.com/kubernetes-sigs/controller-runtime/pkg/recorder"
2527
"k8s.io/apimachinery/pkg/api/meta"
2628
"k8s.io/apimachinery/pkg/runtime"
2729
"k8s.io/client-go/kubernetes/scheme"
@@ -131,5 +133,9 @@ func New(config *rest.Config, options Options) (Manager, error) {
131133

132134
cm.fieldIndexes = cm.cache
133135
cm.client = client.DelegatingClient{ReadInterface: cm.cache, WriteInterface: writeObj}
136+
137+
cm.recorderProvider = recorder.NewProvider()
138+
cm.recorderProvider.SetScheme(cm.scheme)
139+
cm.recorderProvider.SetClientSet(clientconfig.GetKubernetesClientSetOrDieWithConfig(config))
134140
return cm, nil
135141
}

pkg/recorder/recorder.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package recorder
18+
19+
import (
20+
"fmt"
21+
22+
"github.com/golang/glog"
23+
corev1 "k8s.io/api/core/v1"
24+
"k8s.io/apimachinery/pkg/runtime"
25+
"k8s.io/client-go/kubernetes"
26+
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
27+
"k8s.io/client-go/tools/record"
28+
)
29+
30+
// Provider knows how to generate new event recorders with given name.
31+
type Provider interface {
32+
// NewRecorder returns an EventRecorder with given name.
33+
GetEventRecorderFor(name string) (record.EventRecorder, error)
34+
// SetClientSet specifies the Clientset of the provider.
35+
SetClientSet(cs *kubernetes.Clientset)
36+
// SetScheme specifies the Scheme of the provider.
37+
SetScheme(scheme *runtime.Scheme)
38+
}
39+
40+
type provider struct {
41+
clientSet *kubernetes.Clientset
42+
scheme *runtime.Scheme
43+
eventBroadcaster record.EventBroadcaster
44+
}
45+
46+
// NewProvider create a new Provider instance.
47+
func NewProvider() Provider {
48+
return &provider{}
49+
}
50+
51+
func (p *provider) SetClientSet(cs *kubernetes.Clientset) {
52+
if p.clientSet == nil {
53+
p.clientSet = cs
54+
}
55+
}
56+
57+
func (p *provider) SetScheme(scheme *runtime.Scheme) {
58+
if p.scheme == nil {
59+
p.scheme = scheme
60+
}
61+
}
62+
63+
func (p *provider) GetEventRecorderFor(name string) (record.EventRecorder, error) {
64+
if p.scheme == nil {
65+
return nil, fmt.Errorf("must call SetScheme to specify Scheme before getting recorder")
66+
}
67+
68+
if p.eventBroadcaster == nil {
69+
if p.clientSet == nil {
70+
return nil, fmt.Errorf("must call SetClientSet to specify ClientSet before getting recorder")
71+
}
72+
p.eventBroadcaster = record.NewBroadcaster()
73+
p.eventBroadcaster.StartLogging(glog.Infof)
74+
p.eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: p.clientSet.CoreV1().Events("")})
75+
}
76+
77+
return p.eventBroadcaster.NewRecorder(p.scheme, corev1.EventSource{Component: name}), nil
78+
}

pkg/runtime/inject/inject.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package inject
1919
import (
2020
"github.com/kubernetes-sigs/controller-runtime/pkg/cache"
2121
"github.com/kubernetes-sigs/controller-runtime/pkg/client"
22+
"github.com/kubernetes-sigs/controller-runtime/pkg/recorder"
2223
"k8s.io/apimachinery/pkg/runtime"
2324
"k8s.io/client-go/rest"
2425
)
@@ -98,6 +99,21 @@ func StopChannelInto(stop <-chan struct{}, i interface{}) (bool, error) {
9899
return false, nil
99100
}
100101

102+
// Recorder is used by the ControllerManager to inject recorder into Sources, EventHandlers, Predicates, and
103+
// Reconciles
104+
type Recorder interface {
105+
InjectRecorder(provider recorder.Provider) error
106+
}
107+
108+
// RecorderInto will set recorder and return the result on i if it implements Recorder. Returns
109+
// false if i does not implement Recorder.
110+
func RecorderInto(provider recorder.Provider, i interface{}) (bool, error) {
111+
if is, ok := i.(Recorder); ok {
112+
return true, is.InjectRecorder(provider)
113+
}
114+
return false, nil
115+
}
116+
101117
// Func injects dependencies into i.
102118
type Func func(i interface{}) error
103119

0 commit comments

Comments
 (0)