-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Implement InjectRecorder #23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
/* | ||
Copyright 2018 The Kubernetes Authors. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package recorder | ||
|
||
import ( | ||
"fmt" | ||
|
||
"github.com/golang/glog" | ||
corev1 "k8s.io/api/core/v1" | ||
"k8s.io/apimachinery/pkg/runtime" | ||
"k8s.io/client-go/kubernetes" | ||
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" | ||
"k8s.io/client-go/rest" | ||
"k8s.io/client-go/tools/record" | ||
"sigs.k8s.io/controller-runtime/pkg/recorder" | ||
) | ||
|
||
type provider struct { | ||
// scheme to specify when creating a recorder | ||
scheme *runtime.Scheme | ||
// eventBroadcaster to create new recorder instance | ||
eventBroadcaster record.EventBroadcaster | ||
} | ||
|
||
// NewProvider create a new Provider instance. | ||
func NewProvider(config *rest.Config, scheme *runtime.Scheme) (recorder.Provider, error) { | ||
clientSet, err := kubernetes.NewForConfig(config) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to init clientSet: %v", err) | ||
} | ||
|
||
p := &provider{scheme: scheme} | ||
p.eventBroadcaster = record.NewBroadcaster() | ||
p.eventBroadcaster.StartLogging(glog.Infof) | ||
p.eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientSet.CoreV1().Events("")}) | ||
|
||
return p, nil | ||
} | ||
|
||
func (p *provider) GetEventRecorderFor(name string) record.EventRecorder { | ||
return p.eventBroadcaster.NewRecorder(p.scheme, corev1.EventSource{Component: name}) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
/* | ||
Copyright 2018 The Kubernetes Authors. | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package recorder_test | ||
|
||
import ( | ||
appsv1 "k8s.io/api/apps/v1" | ||
corev1 "k8s.io/api/core/v1" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/watch" | ||
"k8s.io/client-go/kubernetes/scheme" | ||
ref "k8s.io/client-go/tools/reference" | ||
"sigs.k8s.io/controller-runtime/pkg/controller" | ||
"sigs.k8s.io/controller-runtime/pkg/handler" | ||
"sigs.k8s.io/controller-runtime/pkg/manager" | ||
"sigs.k8s.io/controller-runtime/pkg/reconcile" | ||
"sigs.k8s.io/controller-runtime/pkg/source" | ||
|
||
. "github.com/onsi/ginkgo" | ||
. "github.com/onsi/gomega" | ||
) | ||
|
||
var _ = Describe("recorder", func() { | ||
var stop chan struct{} | ||
|
||
BeforeEach(func() { | ||
stop = make(chan struct{}) | ||
Expect(cfg).NotTo(BeNil()) | ||
}) | ||
|
||
AfterEach(func() { | ||
close(stop) | ||
}) | ||
|
||
Describe("recorder", func() { | ||
It("should publish events", func(done Done) { | ||
By("Creating the Manager") | ||
cm, err := manager.New(cfg, manager.Options{}) | ||
Expect(err).NotTo(HaveOccurred()) | ||
|
||
By("Creating the Controller") | ||
recorder := cm.GetRecorder("test-recorder") | ||
instance, err := controller.New("foo-controller", cm, controller.Options{ | ||
Reconcile: reconcile.Func( | ||
func(request reconcile.Request) (reconcile.Result, error) { | ||
dp, err := clientset.AppsV1().Deployments(request.Namespace).Get(request.Name, metav1.GetOptions{}) | ||
Expect(err).NotTo(HaveOccurred()) | ||
recorder.Event(dp, corev1.EventTypeNormal, "test-reason", "test-msg") | ||
return reconcile.Result{}, nil | ||
}), | ||
}) | ||
Expect(err).NotTo(HaveOccurred()) | ||
|
||
By("Watching Resources") | ||
err = instance.Watch(&source.Kind{Type: &appsv1.Deployment{}}, &handler.Enqueue{}) | ||
Expect(err).NotTo(HaveOccurred()) | ||
|
||
By("Starting the Manager") | ||
go func() { | ||
defer GinkgoRecover() | ||
Expect(cm.Start(stop)).NotTo(HaveOccurred()) | ||
}() | ||
|
||
deployment := &appsv1.Deployment{ | ||
ObjectMeta: metav1.ObjectMeta{Name: "deployment-name"}, | ||
Spec: appsv1.DeploymentSpec{ | ||
Selector: &metav1.LabelSelector{ | ||
MatchLabels: map[string]string{"foo": "bar"}, | ||
}, | ||
Template: corev1.PodTemplateSpec{ | ||
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}}, | ||
Spec: corev1.PodSpec{ | ||
Containers: []corev1.Container{ | ||
{ | ||
Name: "nginx", | ||
Image: "nginx", | ||
}, | ||
}, | ||
}, | ||
}, | ||
}, | ||
} | ||
|
||
By("Invoking Reconciling") | ||
deployment, err = clientset.AppsV1().Deployments("default").Create(deployment) | ||
Expect(err).NotTo(HaveOccurred()) | ||
|
||
By("Validate event is published as expected") | ||
evtWatcher, err := clientset.CoreV1().Events("default").Watch(metav1.ListOptions{}) | ||
Expect(err).NotTo(HaveOccurred()) | ||
|
||
resultEvent := <-evtWatcher.ResultChan() | ||
|
||
Expect(resultEvent.Type).To(Equal(watch.Added)) | ||
evt, isEvent := resultEvent.Object.(*corev1.Event) | ||
Expect(isEvent).To(BeTrue()) | ||
|
||
dpRef, err := ref.GetReference(scheme.Scheme, deployment) | ||
Expect(err).NotTo(HaveOccurred()) | ||
|
||
Expect(evt.InvolvedObject).To(Equal(*dpRef)) | ||
Expect(evt.Type).To(Equal(corev1.EventTypeNormal)) | ||
Expect(evt.Reason).To(Equal("test-reason")) | ||
Expect(evt.Message).To(Equal("test-msg")) | ||
|
||
close(done) | ||
}) | ||
}) | ||
}) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
/* | ||
Copyright 2018 The Kubernetes Authors. | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package recorder_test | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
. "github.com/onsi/ginkgo" | ||
. "github.com/onsi/gomega" | ||
"k8s.io/client-go/kubernetes" | ||
"k8s.io/client-go/rest" | ||
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" | ||
"sigs.k8s.io/controller-runtime/pkg/test" | ||
) | ||
|
||
func TestRecorder(t *testing.T) { | ||
RegisterFailHandler(Fail) | ||
RunSpecsWithDefaultAndCustomReporters(t, "Recorder Integration Suite", []Reporter{test.NewlineReporter{}}) | ||
} | ||
|
||
var testenv *test.Environment | ||
var cfg *rest.Config | ||
var clientset *kubernetes.Clientset | ||
|
||
var _ = BeforeSuite(func(done Done) { | ||
logf.SetLogger(logf.ZapLogger(false)) | ||
|
||
testenv = &test.Environment{} | ||
|
||
var err error | ||
cfg, err = testenv.Start() | ||
Expect(err).NotTo(HaveOccurred()) | ||
|
||
time.Sleep(1 * time.Second) | ||
|
||
clientset, err = kubernetes.NewForConfig(cfg) | ||
Expect(err).NotTo(HaveOccurred()) | ||
|
||
close(done) | ||
}, 60) | ||
|
||
var _ = AfterSuite(func() { | ||
testenv.Stop() | ||
}) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
/* | ||
Copyright 2018 The Kubernetes Authors. | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package recorder_test | ||
|
||
import ( | ||
. "github.com/onsi/ginkgo" | ||
. "github.com/onsi/gomega" | ||
"k8s.io/client-go/kubernetes/scheme" | ||
"sigs.k8s.io/controller-runtime/pkg/internal/recorder" | ||
) | ||
|
||
var _ = Describe("recorder.Provider", func() { | ||
Describe("NewProvider", func() { | ||
It("should return a provider instance and a nil error.", func() { | ||
provider, err := recorder.NewProvider(cfg, scheme.Scheme) | ||
Expect(provider).NotTo(BeNil()) | ||
Expect(err).NotTo(HaveOccurred()) | ||
}) | ||
|
||
It("should return an error if failed to init clientSet.", func() { | ||
// Invalid the config | ||
cfg1 := *cfg | ||
cfg1.ContentType = "invalid-type" | ||
_, err := recorder.NewProvider(&cfg1, scheme.Scheme) | ||
Expect(err.Error()).To(ContainSubstring("failed to init clientSet")) | ||
}) | ||
}) | ||
Describe("GetEventRecorder", func() { | ||
It("should return a recorder instance.", func() { | ||
provider, err := recorder.NewProvider(cfg, scheme.Scheme) | ||
Expect(err).NotTo(HaveOccurred()) | ||
|
||
recorder := provider.GetEventRecorderFor("test") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you also test that the event recorder will publish events? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
Expect(recorder).NotTo(BeNil()) | ||
}) | ||
}) | ||
}) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,8 +21,10 @@ import ( | |
|
||
"k8s.io/apimachinery/pkg/runtime" | ||
"k8s.io/client-go/rest" | ||
"k8s.io/client-go/tools/record" | ||
"sigs.k8s.io/controller-runtime/pkg/cache" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
"sigs.k8s.io/controller-runtime/pkg/recorder" | ||
"sigs.k8s.io/controller-runtime/pkg/runtime/inject" | ||
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" | ||
) | ||
|
@@ -50,6 +52,10 @@ type controllerManager struct { | |
// which can later be consumed via field selectors from the injected client. | ||
fieldIndexes client.FieldIndexer | ||
|
||
// recorderProvider is used to generate event recorders that will be injected into Controllers | ||
// (and EventHandlers, Sources and Predicates). | ||
recorderProvider recorder.Provider | ||
|
||
mu sync.Mutex | ||
started bool | ||
errChan chan error | ||
|
@@ -122,6 +128,10 @@ func (cm *controllerManager) GetCache() cache.Cache { | |
return cm.cache | ||
} | ||
|
||
func (cm *controllerManager) GetRecorder(name string) record.EventRecorder { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add a comment for this function |
||
return cm.recorderProvider.GetEventRecorderFor(name) | ||
} | ||
|
||
func (cm *controllerManager) Start(stop <-chan struct{}) error { | ||
func() { | ||
cm.mu.Lock() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an awesome way to both test the watching of these events and make the test run faster without polling!