Skip to content

Try to avoid event handling leaks #1089

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/onsi/gomega v1.10.1
github.com/prometheus/client_golang v1.7.1
github.com/prometheus/client_model v0.2.0
go.uber.org/goleak v1.1.10
go.uber.org/zap v1.15.0
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e
gomodules.xyz/jsonpatch/v2 v2.1.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ github.com/mailru/easyjson v0.7.0/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 h1:I0XW9+e1XWDxdcEniV4rQAIOPUGDq67JSCiRCgGCZLI=
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
Expand Down Expand Up @@ -374,6 +375,8 @@ go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A=
Expand Down Expand Up @@ -521,6 +524,7 @@ golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
Expand Down
7 changes: 7 additions & 0 deletions pkg/controller/controller_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package controller_test

import (
"net/http"
"testing"

. "github.com/onsi/ginkgo"
Expand Down Expand Up @@ -45,6 +46,9 @@ var testenv *envtest.Environment
var cfg *rest.Config
var clientset *kubernetes.Clientset

// clientTransport is used to force-close keep-alives in tests that check for leaks
var clientTransport *http.Transport

var _ = BeforeSuite(func(done Done) {
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))

Expand All @@ -64,6 +68,9 @@ var _ = BeforeSuite(func(done Done) {
cfg, err = testenv.Start()
Expect(err).NotTo(HaveOccurred())

clientTransport = &http.Transport{}
cfg.Transport = clientTransport

clientset, err = kubernetes.NewForConfig(cfg)
Expect(err).NotTo(HaveOccurred())

Expand Down
34 changes: 23 additions & 11 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package controller_test
import (
"context"
"fmt"
rt "runtime"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"go.uber.org/goleak"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -94,27 +94,39 @@ var _ = Describe("controller.Controller", func() {
close(done)
})

// This test has been marked as pending because it has been causing lots of flakes in CI.
// It should be rewritten at some point later in the future.
XIt("should not leak goroutines when stop", func(done Done) {
// TODO(directxman12): After closing the proper leaks on watch this must be reduced to 0
// The leaks currently come from the event-related code (as in corev1.Event).
threshold := 3
It("should not leak goroutines when stopped", func() {
currentGRs := goleak.IgnoreCurrent()

m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

_, err = controller.New("new-controller", m, controller.Options{Reconciler: rec})
Expect(err).NotTo(HaveOccurred())

startGoroutines := rt.NumGoroutine()
s := make(chan struct{})
close(s)

Expect(m.Start(s)).NotTo(HaveOccurred())
Expect(rt.NumGoroutine() - startGoroutines).To(BeNumerically("<=", threshold))
Expect(m.Start(s)).To(Succeed())

close(done)
// force-close keep-alive connections. These'll time anyway (after
// like 30s or so) but force it to speed up the tests.
clientTransport.CloseIdleConnections()
Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed())
})

It("should not create goroutines if never started", func() {
currentGRs := goleak.IgnoreCurrent()

m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

_, err = controller.New("new-controller", m, controller.Options{Reconciler: rec})
Expect(err).NotTo(HaveOccurred())

// force-close keep-alive connections. These'll time anyway (after
// like 30s or so) but force it to speed up the tests.
clientTransport.CloseIdleConnections()
Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed())
})
})
})
Expand Down
126 changes: 111 additions & 15 deletions pkg/internal/recorder/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ limitations under the License.
package recorder

import (
"context"
"fmt"
"sync"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
Expand All @@ -26,35 +28,129 @@ import (
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/recorder"
)

type provider struct {
// EventBroadcasterProducer makes an event broadcaster, returning
// whether or not the broadcaster should be stopped with the Provider,
// or not (e.g. if it's shared, it shouldn't be stopped with the Provider).
type EventBroadcasterProducer func() (caster record.EventBroadcaster, stopWithProvider bool)

// Provider is a recorder.Provider that records events to the k8s API server
// and to a logr Logger.
type Provider struct {
// scheme to specify when creating a recorder
scheme *runtime.Scheme
// eventBroadcaster to create new recorder instance
eventBroadcaster record.EventBroadcaster
// logger is the logger to use when logging diagnostic event info
logger logr.Logger
logger logr.Logger
evtClient typedcorev1.EventInterface
makeBroadcaster EventBroadcasterProducer

broadcasterOnce sync.Once
broadcaster record.EventBroadcaster
stopBroadcaster bool
}

// NB(directxman12): this manually implements Stop instead of Being a runnable because we need to
// stop it *after* everything else shuts down, otherwise we'll cause panics as the leader election
// code finishes up and tries to continue emitting events.

// Stop attempts to stop this provider, stopping the underlying broadcaster
// if the broadcaster asked to be stopped. It kinda tries to honor the given
// context, but the underlying broadcaster has an indefinite wait that doesn't
// return until all queued events are flushed, so this may end up just returning
// before the underlying wait has finished instead of cancelling the wait.
// This is Very Frustrating™.
func (p *Provider) Stop(shutdownCtx context.Context) {
doneCh := make(chan struct{})

go func() {
// technically, this could start the broadcaster, but practically, it's
// almost certainly already been started (e.g. by leader election). We
// need to invoke this to ensure that we don't inadvertently race with
// an invocation of getBroadcaster.
broadcaster := p.getBroadcaster()
if p.stopBroadcaster {
broadcaster.Shutdown()
}
close(doneCh)
}()

select {
case <-shutdownCtx.Done():
case <-doneCh:
}
}

// getBroadcaster ensures that a broadcaster is started for this
// provider, and returns it. It's threadsafe.
func (p *Provider) getBroadcaster() record.EventBroadcaster {
// NB(directxman12): this can technically still leak if something calls
// "getBroadcaster" (i.e. Emits an Event) but never calls Start, but if we
// create the broadcaster in start, we could race with other things that
// are started at the same time & want to emit events. The alternative is
// silently swallowing events and more locking, but that seems suboptimal.

p.broadcasterOnce.Do(func() {
broadcaster, stop := p.makeBroadcaster()
broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: p.evtClient})

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the watch.Interface returned by StartRecordingToSink and StartEventWatcher need to be Stop()ed or is it taken care of by the above broadcaster.Shutdown()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, good catch, let me double-check

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the broadcaster handles it: that watcher is returned by apimachinery's watch.Broadcaster, which doesn't actually start a goroutine -- it just holds a channel. Shutdown on the broadcaster calls the underlying watch.Broadcaster's Shutdown, which closes Broadcaster.incoming, which breaks the loop in Broadcaster.loop, which calls m.closeAll after the loop is done, which calls close on all those open channels.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

broadcaster.StartEventWatcher(
func(e *corev1.Event) {
p.logger.V(1).Info(e.Type, "object", e.InvolvedObject, "reason", e.Reason, "message", e.Message)
})
p.broadcaster = broadcaster
p.stopBroadcaster = stop
})

return p.broadcaster
}

// NewProvider create a new Provider instance.
func NewProvider(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, broadcaster record.EventBroadcaster) (recorder.Provider, error) {
func NewProvider(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster EventBroadcasterProducer) (*Provider, error) {
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to init clientSet: %w", err)
}

p := &provider{scheme: scheme, logger: logger, eventBroadcaster: broadcaster}
p.eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientSet.CoreV1().Events("")})
p.eventBroadcaster.StartEventWatcher(
func(e *corev1.Event) {
p.logger.V(1).Info(e.Type, "object", e.InvolvedObject, "reason", e.Reason, "message", e.Message)
})

p := &Provider{scheme: scheme, logger: logger, makeBroadcaster: makeBroadcaster, evtClient: clientSet.CoreV1().Events("")}
return p, nil
}

func (p *provider) GetEventRecorderFor(name string) record.EventRecorder {
return p.eventBroadcaster.NewRecorder(p.scheme, corev1.EventSource{Component: name})
// GetEventRecorderFor returns an event recorder that broadcasts to this provider's
// broadcaster. All events will be associated with a component of the given name.
func (p *Provider) GetEventRecorderFor(name string) record.EventRecorder {
return &lazyRecorder{
prov: p,
name: name,
}
}

// lazyRecorder is a recorder that doesn't actually instantiate any underlying
// recorder until the first event is emitted.
type lazyRecorder struct {
prov *Provider
name string

recOnce sync.Once
rec record.EventRecorder
}

// ensureRecording ensures that a concrete recorder is populated for this recorder.
func (l *lazyRecorder) ensureRecording() {
l.recOnce.Do(func() {
broadcaster := l.prov.getBroadcaster()
l.rec = broadcaster.NewRecorder(l.prov.scheme, corev1.EventSource{Component: l.name})
})
}

func (l *lazyRecorder) Event(object runtime.Object, eventtype, reason, message string) {
l.ensureRecording()
l.rec.Event(object, eventtype, reason, message)
}
func (l *lazyRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
l.ensureRecording()
l.rec.Eventf(object, eventtype, reason, messageFmt, args...)
}
func (l *lazyRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
l.ensureRecording()
l.rec.AnnotatedEventf(object, annotations, eventtype, reason, messageFmt, args...)
}
7 changes: 4 additions & 3 deletions pkg/internal/recorder/recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import (
)

var _ = Describe("recorder.Provider", func() {
makeBroadcaster := func() (record.EventBroadcaster, bool) { return record.NewBroadcaster(), true }
Describe("NewProvider", func() {
It("should return a provider instance and a nil error.", func() {
provider, err := recorder.NewProvider(cfg, scheme.Scheme, tlog.NullLogger{}, record.NewBroadcaster())
provider, err := recorder.NewProvider(cfg, scheme.Scheme, tlog.NullLogger{}, makeBroadcaster)
Expect(provider).NotTo(BeNil())
Expect(err).NotTo(HaveOccurred())
})
Expand All @@ -37,14 +38,14 @@ var _ = Describe("recorder.Provider", func() {
// Invalid the config
cfg1 := *cfg
cfg1.Host = "invalid host"
_, err := recorder.NewProvider(&cfg1, scheme.Scheme, tlog.NullLogger{}, record.NewBroadcaster())
_, err := recorder.NewProvider(&cfg1, scheme.Scheme, tlog.NullLogger{}, makeBroadcaster)
Expect(err).NotTo(BeNil())
Expect(err.Error()).To(ContainSubstring("failed to init clientSet"))
})
})
Describe("GetEventRecorder", func() {
It("should return a recorder instance.", func() {
provider, err := recorder.NewProvider(cfg, scheme.Scheme, tlog.NullLogger{}, record.NewBroadcaster())
provider, err := recorder.NewProvider(cfg, scheme.Scheme, tlog.NullLogger{}, makeBroadcaster)
Expect(err).NotTo(HaveOccurred())

recorder := provider.GetEventRecorderFor("test")
Expand Down
10 changes: 8 additions & 2 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/healthz"
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
"sigs.k8s.io/controller-runtime/pkg/metrics"
"sigs.k8s.io/controller-runtime/pkg/recorder"
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
"sigs.k8s.io/controller-runtime/pkg/webhook"
)
Expand Down Expand Up @@ -89,7 +89,7 @@ type controllerManager struct {

// recorderProvider is used to generate event recorders that will be injected into Controllers
// (and EventHandlers, Sources and Predicates).
recorderProvider recorder.Provider
recorderProvider *intrec.Provider

// resourceLock forms the basis for leader election
resourceLock resourcelock.Interface
Expand Down Expand Up @@ -535,6 +535,12 @@ func (cm *controllerManager) engageStopProcedure(stopComplete chan struct{}) err
cm.mu.Lock()
defer cm.mu.Unlock()
cm.stopProcedureEngaged = true

// we want to close this after the other runnables stop, because we don't
// want things like leader election to try and emit events on a closed
// channel
defer cm.recorderProvider.Stop(cm.shutdownCtx)

return cm.waitForRunnableToEnd(cm.shutdownCtx, cancel)
}

Expand Down
Loading