Skip to content

Commit a299678

Browse files
committed
implement InjectRecorder
1 parent bdb5f7c commit a299678

File tree

7 files changed

+148
-6
lines changed

7 files changed

+148
-6
lines changed

pkg/controller/controller.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,13 @@ func New(name string, mrg manager.Manager, options Options) (Controller, error)
7676

7777
// Create controller with dependencies set
7878
c := &controller.Controller{
79-
Do: options.Reconcile,
80-
Cache: mrg.GetCache(),
81-
Config: mrg.GetConfig(),
82-
Scheme: mrg.GetScheme(),
83-
Client: mrg.GetClient(),
84-
Queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
79+
Do: options.Reconcile,
80+
Cache: mrg.GetCache(),
81+
Config: mrg.GetConfig(),
82+
Scheme: mrg.GetScheme(),
83+
Client: mrg.GetClient(),
84+
Recorder: mrg.GetRecorder(name),
85+
Queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
8586
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
8687
Name: name,
8788
}

pkg/internal/controller/controller.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3434
"k8s.io/apimachinery/pkg/util/wait"
3535
"k8s.io/client-go/rest"
36+
"k8s.io/client-go/tools/record"
3637
"k8s.io/client-go/util/workqueue"
3738
)
3839

@@ -86,6 +87,10 @@ type Controller struct {
8687
// Started is true if the Controller has been Started
8788
Started bool
8889

90+
// Recorder is an event recorder for recording Event resources to the
91+
// Kubernetes API.
92+
Recorder record.EventRecorder
93+
8994
// TODO(community): Consider initializing a logger with the Controller Name as the tag
9095
}
9196

pkg/internal/recorder/recorder.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package recorder
18+
19+
import (
20+
"fmt"
21+
"sync"
22+
23+
"github.com/golang/glog"
24+
"github.com/kubernetes-sigs/controller-runtime/pkg/recorder"
25+
corev1 "k8s.io/api/core/v1"
26+
"k8s.io/apimachinery/pkg/runtime"
27+
"k8s.io/client-go/kubernetes"
28+
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
29+
"k8s.io/client-go/rest"
30+
"k8s.io/client-go/tools/record"
31+
)
32+
33+
type provider struct {
34+
// once ensures the eventBroadcaster will be initialized only once
35+
once sync.Once
36+
// clientSet to talk to kubernetes API server
37+
clientSet *kubernetes.Clientset
38+
// scheme to specify when creating a recorder
39+
scheme *runtime.Scheme
40+
// eventBroadcaster to create new recorder instance
41+
eventBroadcaster record.EventBroadcaster
42+
}
43+
44+
// NewProvider create a new Provider instance.
45+
func NewProvider(config *rest.Config, scheme *runtime.Scheme) (recorder.Provider, error) {
46+
clientSet, err := kubernetes.NewForConfig(config)
47+
if err != nil {
48+
return nil, fmt.Errorf("failed to init clientSet: %v", err)
49+
}
50+
return &provider{
51+
clientSet: clientSet,
52+
scheme: scheme,
53+
}, nil
54+
}
55+
56+
func (p *provider) initBroadcaster() {
57+
p.eventBroadcaster = record.NewBroadcaster()
58+
p.eventBroadcaster.StartLogging(glog.Infof)
59+
p.eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: p.clientSet.CoreV1().Events("")})
60+
}
61+
62+
func (p *provider) GetEventRecorderFor(name string) record.EventRecorder {
63+
// Init the Broadcaster only when the first recorder is really needed.
64+
p.once.Do(p.initBroadcaster)
65+
66+
return p.eventBroadcaster.NewRecorder(p.scheme, corev1.EventSource{Component: name})
67+
}

pkg/manager/internal.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@ import (
2121

2222
"github.com/kubernetes-sigs/controller-runtime/pkg/cache"
2323
"github.com/kubernetes-sigs/controller-runtime/pkg/client"
24+
"github.com/kubernetes-sigs/controller-runtime/pkg/recorder"
2425
"github.com/kubernetes-sigs/controller-runtime/pkg/runtime/inject"
2526
logf "github.com/kubernetes-sigs/controller-runtime/pkg/runtime/log"
2627
"k8s.io/apimachinery/pkg/runtime"
2728
"k8s.io/client-go/rest"
29+
"k8s.io/client-go/tools/record"
2830
)
2931

3032
var log = logf.KBLog.WithName("manager")
@@ -50,6 +52,10 @@ type controllerManager struct {
5052
// which can later be consumed via field selectors from the injected client.
5153
fieldIndexes client.FieldIndexer
5254

55+
// recorderProvider is used to generate event recorders that will be injected into Controllers
56+
// (and EventHandlers, Sources and Predicates).
57+
recorderProvider recorder.Provider
58+
5359
mu sync.Mutex
5460
started bool
5561
errChan chan error
@@ -93,6 +99,9 @@ func (cm *controllerManager) SetFields(i interface{}) error {
9399
if _, err := inject.CacheInto(cm.cache, i); err != nil {
94100
return err
95101
}
102+
if _, err := inject.RecorderInto(cm.recorderProvider, i); err != nil {
103+
return err
104+
}
96105
if _, err := inject.InjectorInto(cm.SetFields, i); err != nil {
97106
return err
98107
}
@@ -122,6 +131,10 @@ func (cm *controllerManager) GetCache() cache.Cache {
122131
return cm.cache
123132
}
124133

134+
func (cm *controllerManager) GetRecorder(name string) record.EventRecorder {
135+
return cm.recorderProvider.GetEventRecorderFor(name)
136+
}
137+
125138
func (cm *controllerManager) Start(stop <-chan struct{}) error {
126139
func() {
127140
cm.mu.Lock()

pkg/manager/manager.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@ import (
2222
"github.com/kubernetes-sigs/controller-runtime/pkg/cache"
2323
"github.com/kubernetes-sigs/controller-runtime/pkg/client"
2424
"github.com/kubernetes-sigs/controller-runtime/pkg/client/apiutil"
25+
"github.com/kubernetes-sigs/controller-runtime/pkg/internal/recorder"
2526
"k8s.io/apimachinery/pkg/api/meta"
2627
"k8s.io/apimachinery/pkg/runtime"
2728
"k8s.io/client-go/kubernetes/scheme"
2829
"k8s.io/client-go/rest"
30+
"k8s.io/client-go/tools/record"
2931
)
3032

3133
// Manager initializes shared dependencies such as Caches and Clients, and provides them to runnables.
@@ -57,6 +59,9 @@ type Manager interface {
5759

5860
// GetCache returns a cache.Cache
5961
GetCache() cache.Cache
62+
63+
// GetRecorder returns a new EventRecorder for the provided name
64+
GetRecorder(name string) record.EventRecorder
6065
}
6166

6267
// Options are the arguments for creating a new Manager
@@ -133,5 +138,11 @@ func New(config *rest.Config, options Options) (Manager, error) {
133138

134139
cm.fieldIndexes = cm.cache
135140
cm.client = client.DelegatingClient{Reader: cm.cache, Writer: writeObj}
141+
142+
cm.recorderProvider, err = recorder.NewProvider(cm.config, cm.scheme)
143+
if err != nil {
144+
return nil, err
145+
}
146+
136147
return cm, nil
137148
}

pkg/recorder/recorder.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package recorder
18+
19+
import (
20+
"k8s.io/client-go/tools/record"
21+
)
22+
23+
// Provider knows how to generate new event recorders with given name.
24+
type Provider interface {
25+
// NewRecorder returns an EventRecorder with given name.
26+
GetEventRecorderFor(name string) record.EventRecorder
27+
}

pkg/runtime/inject/inject.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package inject
1919
import (
2020
"github.com/kubernetes-sigs/controller-runtime/pkg/cache"
2121
"github.com/kubernetes-sigs/controller-runtime/pkg/client"
22+
"github.com/kubernetes-sigs/controller-runtime/pkg/recorder"
2223
"k8s.io/apimachinery/pkg/runtime"
2324
"k8s.io/client-go/rest"
2425
)
@@ -98,6 +99,23 @@ func StopChannelInto(stop <-chan struct{}, i interface{}) (bool, error) {
9899
return false, nil
99100
}
100101

102+
// Recorder is used by the ControllerManager to inject recorder into Sources, EventHandlers, Predicates, and
103+
// Reconciles
104+
type Recorder interface {
105+
// InjectRecorder is used to inject event recorder,
106+
// a provider is passed in for recorder generation.
107+
InjectRecorder(provider recorder.Provider) error
108+
}
109+
110+
// RecorderInto will set recorder and return the result on i if it implements Recorder. Returns
111+
// false if i does not implement Recorder.
112+
func RecorderInto(provider recorder.Provider, i interface{}) (bool, error) {
113+
if is, ok := i.(Recorder); ok {
114+
return true, is.InjectRecorder(provider)
115+
}
116+
return false, nil
117+
}
118+
101119
// Func injects dependencies into i.
102120
type Func func(i interface{}) error
103121

0 commit comments

Comments
 (0)