Skip to content

Commit e169648

Browse files
committed
implement InjectRecorder
1 parent bdb5f7c commit e169648

File tree

6 files changed

+160
-20
lines changed

6 files changed

+160
-20
lines changed

pkg/controller/controller.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,13 @@ func New(name string, mrg manager.Manager, options Options) (Controller, error)
7676

7777
// Create controller with dependencies set
7878
c := &controller.Controller{
79-
Do: options.Reconcile,
80-
Cache: mrg.GetCache(),
81-
Config: mrg.GetConfig(),
82-
Scheme: mrg.GetScheme(),
83-
Client: mrg.GetClient(),
84-
Queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
79+
Do: options.Reconcile,
80+
Cache: mrg.GetCache(),
81+
Config: mrg.GetConfig(),
82+
Scheme: mrg.GetScheme(),
83+
Client: mrg.GetClient(),
84+
Recorder: mrg.GetRecorder(name),
85+
Queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
8586
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
8687
Name: name,
8788
}

pkg/internal/controller/controller.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3434
"k8s.io/apimachinery/pkg/util/wait"
3535
"k8s.io/client-go/rest"
36+
"k8s.io/client-go/tools/record"
3637
"k8s.io/client-go/util/workqueue"
3738
)
3839

@@ -86,6 +87,10 @@ type Controller struct {
8687
// Started is true if the Controller has been Started
8788
Started bool
8889

90+
// Recorder is an event recorder for recording Event resources to the
91+
// Kubernetes API.
92+
Recorder record.EventRecorder
93+
8994
// TODO(community): Consider initializing a logger with the Controller Name as the tag
9095
}
9196

pkg/internal/recorder/recorder.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package recorder
18+
19+
import (
20+
"fmt"
21+
"sync"
22+
23+
"github.com/golang/glog"
24+
"github.com/kubernetes-sigs/controller-runtime/pkg/recorder"
25+
corev1 "k8s.io/api/core/v1"
26+
"k8s.io/apimachinery/pkg/runtime"
27+
"k8s.io/client-go/kubernetes"
28+
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
29+
"k8s.io/client-go/rest"
30+
"k8s.io/client-go/tools/record"
31+
)
32+
33+
type provider struct {
34+
// once ensures the eventBroadcaster will be initialized only once
35+
once sync.Once
36+
// clientSet to talk to kubernetes API server
37+
clientSet *kubernetes.Clientset
38+
// scheme to specify when creating a recorder
39+
scheme *runtime.Scheme
40+
// eventBroadcaster to create new recorder instance
41+
eventBroadcaster record.EventBroadcaster
42+
}
43+
44+
// NewProvider create a new Provider instance.
45+
func NewProvider(config *rest.Config, scheme *runtime.Scheme) (recorder.Provider, error) {
46+
clientSet, err := kubernetes.NewForConfig(config)
47+
if err != nil {
48+
return nil, fmt.Errorf("failed to init clientSet: %v", err)
49+
}
50+
return &provider{
51+
clientSet: clientSet,
52+
scheme: scheme,
53+
}, nil
54+
}
55+
56+
func (p *provider) initBroadcaster() {
57+
p.eventBroadcaster = record.NewBroadcaster()
58+
p.eventBroadcaster.StartLogging(glog.Infof)
59+
p.eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: p.clientSet.CoreV1().Events("")})
60+
}
61+
62+
func (p *provider) GetEventRecorderFor(name string) record.EventRecorder {
63+
// Init the Broadcaster only when the first recorder is really needed.
64+
p.once.Do(p.initBroadcaster)
65+
66+
return p.eventBroadcaster.NewRecorder(p.scheme, corev1.EventSource{Component: name})
67+
}

pkg/manager/internal.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@ import (
2121

2222
"github.com/kubernetes-sigs/controller-runtime/pkg/cache"
2323
"github.com/kubernetes-sigs/controller-runtime/pkg/client"
24+
"github.com/kubernetes-sigs/controller-runtime/pkg/recorder"
2425
"github.com/kubernetes-sigs/controller-runtime/pkg/runtime/inject"
2526
logf "github.com/kubernetes-sigs/controller-runtime/pkg/runtime/log"
2627
"k8s.io/apimachinery/pkg/runtime"
2728
"k8s.io/client-go/rest"
29+
"k8s.io/client-go/tools/record"
2830
)
2931

3032
var log = logf.KBLog.WithName("manager")
@@ -50,6 +52,10 @@ type controllerManager struct {
5052
// which can later be consumed via field selectors from the injected client.
5153
fieldIndexes client.FieldIndexer
5254

55+
// recorderProvider is used to generate event recorders that will be injected into Controllers
56+
// (and EventHandlers, Sources and Predicates).
57+
recorderProvider recorder.Provider
58+
5359
mu sync.Mutex
5460
started bool
5561
errChan chan error
@@ -122,6 +128,10 @@ func (cm *controllerManager) GetCache() cache.Cache {
122128
return cm.cache
123129
}
124130

131+
func (cm *controllerManager) GetRecorder(name string) record.EventRecorder {
132+
return cm.recorderProvider.GetEventRecorderFor(name)
133+
}
134+
125135
func (cm *controllerManager) Start(stop <-chan struct{}) error {
126136
func() {
127137
cm.mu.Lock()

pkg/manager/manager.go

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,13 @@ import (
2222
"github.com/kubernetes-sigs/controller-runtime/pkg/cache"
2323
"github.com/kubernetes-sigs/controller-runtime/pkg/client"
2424
"github.com/kubernetes-sigs/controller-runtime/pkg/client/apiutil"
25+
internalrecorder "github.com/kubernetes-sigs/controller-runtime/pkg/internal/recorder"
26+
"github.com/kubernetes-sigs/controller-runtime/pkg/recorder"
2527
"k8s.io/apimachinery/pkg/api/meta"
2628
"k8s.io/apimachinery/pkg/runtime"
2729
"k8s.io/client-go/kubernetes/scheme"
2830
"k8s.io/client-go/rest"
31+
"k8s.io/client-go/tools/record"
2932
)
3033

3134
// Manager initializes shared dependencies such as Caches and Clients, and provides them to runnables.
@@ -57,6 +60,9 @@ type Manager interface {
5760

5861
// GetCache returns a cache.Cache
5962
GetCache() cache.Cache
63+
64+
// GetRecorder returns a new EventRecorder for the provided name
65+
GetRecorder(name string) record.EventRecorder
6066
}
6167

6268
// Options are the arguments for creating a new Manager
@@ -69,8 +75,9 @@ type Options struct {
6975
MapperProvider func(c *rest.Config) (meta.RESTMapper, error)
7076

7177
// Dependency injection for testing
72-
newCache func(config *rest.Config, opts cache.Options) (cache.Cache, error)
73-
newClient func(config *rest.Config, options client.Options) (client.Client, error)
78+
newCache func(config *rest.Config, opts cache.Options) (cache.Cache, error)
79+
newClient func(config *rest.Config, options client.Options) (client.Client, error)
80+
newRecorderProvider func(config *rest.Config, scheme *runtime.Scheme) (recorder.Provider, error)
7481
}
7582

7683
// Runnable allows a component to be started.
@@ -102,36 +109,59 @@ func New(config *rest.Config, options Options) (Manager, error) {
102109
cm.scheme = scheme.Scheme
103110
}
104111

105-
// Create a new RESTMapper for mapping GroupVersionKinds to Resources
106-
if options.MapperProvider == nil {
107-
options.MapperProvider = apiutil.NewDiscoveryRESTMapper
108-
}
112+
// Set default values for options fields
113+
options = setOptionsDefaults(options)
114+
109115
mapper, err := options.MapperProvider(cm.config)
110116
if err != nil {
111117
log.Error(err, "Failed to get API Group-Resources")
112118
return nil, err
113119
}
114120

115-
// Allow newClient to be mocked
116-
if options.newClient == nil {
117-
options.newClient = client.New
118-
}
119121
// Create the Client for Write operations.
120122
writeObj, err := options.newClient(cm.config, client.Options{Scheme: cm.scheme, Mapper: mapper})
121123
if err != nil {
122124
return nil, err
123125
}
124126

125-
// TODO(directxman12): Figure out how to allow users to request a client without requesting a watch
126-
if options.newCache == nil {
127-
options.newCache = cache.New
128-
}
129127
cm.cache, err = options.newCache(cm.config, cache.Options{Scheme: cm.scheme, Mapper: mapper})
130128
if err != nil {
131129
return nil, err
132130
}
133131

134132
cm.fieldIndexes = cm.cache
135133
cm.client = client.DelegatingClient{Reader: cm.cache, Writer: writeObj}
134+
135+
// Create the recorder provider to inject event recorders for the components.
136+
cm.recorderProvider, err = options.newRecorderProvider(cm.config, cm.scheme)
137+
if err != nil {
138+
return nil, err
139+
}
140+
136141
return cm, nil
137142
}
143+
144+
// setOptionsDefaults set default values for Options fields
145+
func setOptionsDefaults(options Options) Options {
146+
if options.MapperProvider == nil {
147+
options.MapperProvider = apiutil.NewDiscoveryRESTMapper
148+
}
149+
150+
// Allow newClient to be mocked
151+
if options.newClient == nil {
152+
options.newClient = client.New
153+
}
154+
155+
// Allow newCache to be mocked
156+
// TODO(directxman12): Figure out how to allow users to request a client without requesting a watch
157+
if options.newCache == nil {
158+
options.newCache = cache.New
159+
}
160+
161+
// Allow newRecorderProvider to be mocked
162+
if options.newRecorderProvider == nil {
163+
options.newRecorderProvider = internalrecorder.NewProvider
164+
}
165+
166+
return options
167+
}

pkg/recorder/recorder.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package recorder
18+
19+
import (
20+
"k8s.io/client-go/tools/record"
21+
)
22+
23+
// Provider knows how to generate new event recorders with given name.
24+
type Provider interface {
25+
// NewRecorder returns an EventRecorder with given name.
26+
GetEventRecorderFor(name string) record.EventRecorder
27+
}

0 commit comments

Comments
 (0)