Skip to content

Commit d045fcf

Browse files
committed
Try to avoid event handling leaks
Since we now have the ability shut down the event broadcaster, we can write mostly goroutine-leak-free event handling setup. This changes the default event handling setup to defer the broadcaster initialization the first time it's used, and then to shut it down once the manager shuts down. In the case where a broadcaster is manually specified, it's the specifier's job to shut down the broadcaster instead. We'll probably still want to overhaul the whole event system at some point in the future though. This also re-enables the tests for leaks, switching them to an eventually to avoid flakes & reducing the threshold to zero.
1 parent 3715e20 commit d045fcf

File tree

6 files changed

+217
-49
lines changed

6 files changed

+217
-49
lines changed

pkg/controller/controller_test.go

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ package controller_test
1919
import (
2020
"context"
2121
"fmt"
22+
"os"
2223
rt "runtime"
24+
"runtime/pprof"
2325

2426
. "github.com/onsi/ginkgo"
2527
. "github.com/onsi/gomega"
@@ -94,13 +96,10 @@ var _ = Describe("controller.Controller", func() {
9496
close(done)
9597
})
9698

97-
// This test has been marked as pending because it has been causing lots of flakes in CI.
98-
// It should be rewritten at some point later in the future.
99-
XIt("should not leak goroutines when stop", func(done Done) {
100-
// TODO(directxman12): After closing the proper leaks on watch this must be reduced to 0
101-
// The leaks currently come from the event-related code (as in corev1.Event).
102-
threshold := 3
103-
99+
It("should not leak goroutines when stopped", func() {
100+
// NB(directxman12): this test was flaky before on CI, but my guess
101+
// is that the flakiness was caused by an expect on the count.
102+
// Eventually should fix it, but if not, consider disabling again.
104103
m, err := manager.New(cfg, manager.Options{})
105104
Expect(err).NotTo(HaveOccurred())
106105

@@ -112,9 +111,21 @@ var _ = Describe("controller.Controller", func() {
112111
close(s)
113112

114113
Expect(m.Start(s)).NotTo(HaveOccurred())
115-
Expect(rt.NumGoroutine() - startGoroutines).To(BeNumerically("<=", threshold))
114+
Eventually(rt.NumGoroutine /* pass the function, don't call it */).Should(Equal(startGoroutines))
115+
})
116116

117-
close(done)
117+
It("should not create goroutines if never started", func() {
118+
startGoroutines := rt.NumGoroutine()
119+
Expect(pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)).To(Succeed())
120+
121+
m, err := manager.New(cfg, manager.Options{})
122+
Expect(err).NotTo(HaveOccurred())
123+
124+
_, err = controller.New("new-controller", m, controller.Options{Reconciler: rec})
125+
Expect(err).NotTo(HaveOccurred())
126+
127+
Expect(pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)).To(Succeed())
128+
Eventually(rt.NumGoroutine /* pass func, don't call */).Should(Equal(startGoroutines))
118129
})
119130
})
120131
})

pkg/internal/recorder/recorder.go

Lines changed: 111 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ limitations under the License.
1717
package recorder
1818

1919
import (
20+
"context"
2021
"fmt"
22+
"sync"
2123

2224
"github.com/go-logr/logr"
2325
corev1 "k8s.io/api/core/v1"
@@ -26,35 +28,129 @@ import (
2628
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
2729
"k8s.io/client-go/rest"
2830
"k8s.io/client-go/tools/record"
29-
"sigs.k8s.io/controller-runtime/pkg/recorder"
3031
)
3132

32-
type provider struct {
33+
// EventBroadcasterProducer makes an event broadcaster, returning
34+
// whether or not the broadcaster should be stopped with the Provider,
35+
// or not (e.g. if it's shared, it shouldn't be stopped with the Provider).
36+
type EventBroadcasterProducer func() (caster record.EventBroadcaster, stopWithProvider bool)
37+
38+
// Provider is a recorder.Provider that records events to the k8s API server
39+
// and to a logr Logger.
40+
type Provider struct {
3341
// scheme to specify when creating a recorder
3442
scheme *runtime.Scheme
35-
// eventBroadcaster to create new recorder instance
36-
eventBroadcaster record.EventBroadcaster
3743
// logger is the logger to use when logging diagnostic event info
38-
logger logr.Logger
44+
logger logr.Logger
45+
evtClient typedcorev1.EventInterface
46+
makeBroadcaster EventBroadcasterProducer
47+
48+
broadcasterOnce sync.Once
49+
broadcaster record.EventBroadcaster
50+
stopBroadcaster bool
51+
}
52+
53+
// NB(directxman12): this manually implements Stop instead of Being a runnable because we need to
54+
// stop it *after* everything else shuts down, otherwise we'll cause panics as the leader election
55+
// code finishes up and tries to continue emitting events.
56+
57+
// Stop attempts to stop this provider, stopping the underlying broadcaster
58+
// if the broadcaster asked to be stopped. It kinda tries to honor the given
59+
// context, but the underlying broadcaster has an indefinite wait that doesn't
60+
// return until all queued events are flushed, so this may end up just returning
61+
// before the underlying wait has finished instead of cancelling the wait.
62+
// This is Very Frustrating™.
63+
func (p *Provider) Stop(shutdownCtx context.Context) {
64+
doneCh := make(chan struct{})
65+
66+
go func() {
67+
// technically, this could start the broadcaster, but practically, it's
68+
// almost certainly already been started (e.g. by leader election). We
69+
// need to invoke this to ensure that we don't inadvertently race with
70+
// an invocation of getBroadcaster.
71+
broadcaster := p.getBroadcaster()
72+
if p.stopBroadcaster {
73+
broadcaster.Shutdown()
74+
}
75+
close(doneCh)
76+
}()
77+
78+
select {
79+
case <-shutdownCtx.Done():
80+
case <-doneCh:
81+
}
82+
}
83+
84+
// getBroadcaster ensures that a broadcaster is started for this
85+
// provider, and returns it. It's threadsafe.
86+
func (p *Provider) getBroadcaster() record.EventBroadcaster {
87+
// NB(directxman12): this can technically still leak if something calls
88+
// "getBroadcaster" (i.e. Emits an Event) but never calls Start, but if we
89+
// create the broadcaster in start, we could race with other things that
90+
// are started at the same time & want to emit events. The alternative is
91+
// silently swallowing events and more locking, but that seems suboptimal.
92+
93+
p.broadcasterOnce.Do(func() {
94+
broadcaster, stop := p.makeBroadcaster()
95+
broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: p.evtClient})
96+
broadcaster.StartEventWatcher(
97+
func(e *corev1.Event) {
98+
p.logger.V(1).Info(e.Type, "object", e.InvolvedObject, "reason", e.Reason, "message", e.Message)
99+
})
100+
p.broadcaster = broadcaster
101+
p.stopBroadcaster = stop
102+
})
103+
104+
return p.broadcaster
39105
}
40106

41107
// NewProvider create a new Provider instance.
42-
func NewProvider(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, broadcaster record.EventBroadcaster) (recorder.Provider, error) {
108+
func NewProvider(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster EventBroadcasterProducer) (*Provider, error) {
43109
clientSet, err := kubernetes.NewForConfig(config)
44110
if err != nil {
45111
return nil, fmt.Errorf("failed to init clientSet: %w", err)
46112
}
47113

48-
p := &provider{scheme: scheme, logger: logger, eventBroadcaster: broadcaster}
49-
p.eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientSet.CoreV1().Events("")})
50-
p.eventBroadcaster.StartEventWatcher(
51-
func(e *corev1.Event) {
52-
p.logger.V(1).Info(e.Type, "object", e.InvolvedObject, "reason", e.Reason, "message", e.Message)
53-
})
54-
114+
p := &Provider{scheme: scheme, logger: logger, makeBroadcaster: makeBroadcaster, evtClient: clientSet.CoreV1().Events("")}
55115
return p, nil
56116
}
57117

58-
func (p *provider) GetEventRecorderFor(name string) record.EventRecorder {
59-
return p.eventBroadcaster.NewRecorder(p.scheme, corev1.EventSource{Component: name})
118+
// GetEventRecorderFor returns an event recorder that broadcasts to this provider's
119+
// broadcaster. All events will be associated with a component of the given name.
120+
func (p *Provider) GetEventRecorderFor(name string) record.EventRecorder {
121+
return &lazyRecorder{
122+
prov: p,
123+
name: name,
124+
}
125+
}
126+
127+
// lazyRecorder is a recorder that doesn't actually instantiate any underlying
128+
// recorder until the first event is emitted.
129+
type lazyRecorder struct {
130+
prov *Provider
131+
name string
132+
133+
recOnce sync.Once
134+
rec record.EventRecorder
135+
}
136+
137+
// ensureRecording ensures that a concrete recorder is populated for this recorder.
138+
func (l *lazyRecorder) ensureRecording() {
139+
l.recOnce.Do(func() {
140+
broadcaster := l.prov.getBroadcaster()
141+
l.rec = broadcaster.NewRecorder(l.prov.scheme, corev1.EventSource{Component: l.name})
142+
})
143+
}
144+
145+
func (l *lazyRecorder) Event(object runtime.Object, eventtype, reason, message string) {
146+
l.ensureRecording()
147+
l.rec.Event(object, eventtype, reason, message)
148+
}
149+
func (l *lazyRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
150+
l.ensureRecording()
151+
l.rec.Eventf(object, eventtype, reason, messageFmt, args...)
152+
}
153+
func (l *lazyRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
154+
l.ensureRecording()
155+
l.rec.AnnotatedEventf(object, annotations, eventtype, reason, messageFmt, args...)
60156
}

pkg/internal/recorder/recorder_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@ import (
2626
)
2727

2828
var _ = Describe("recorder.Provider", func() {
29+
makeBroadcaster := func() (record.EventBroadcaster, bool) { return record.NewBroadcaster(), true }
2930
Describe("NewProvider", func() {
3031
It("should return a provider instance and a nil error.", func() {
31-
provider, err := recorder.NewProvider(cfg, scheme.Scheme, tlog.NullLogger{}, record.NewBroadcaster())
32+
provider, err := recorder.NewProvider(cfg, scheme.Scheme, tlog.NullLogger{}, makeBroadcaster)
3233
Expect(provider).NotTo(BeNil())
3334
Expect(err).NotTo(HaveOccurred())
3435
})
@@ -37,14 +38,14 @@ var _ = Describe("recorder.Provider", func() {
3738
// Invalid the config
3839
cfg1 := *cfg
3940
cfg1.Host = "invalid host"
40-
_, err := recorder.NewProvider(&cfg1, scheme.Scheme, tlog.NullLogger{}, record.NewBroadcaster())
41+
_, err := recorder.NewProvider(&cfg1, scheme.Scheme, tlog.NullLogger{}, makeBroadcaster)
4142
Expect(err).NotTo(BeNil())
4243
Expect(err.Error()).To(ContainSubstring("failed to init clientSet"))
4344
})
4445
})
4546
Describe("GetEventRecorder", func() {
4647
It("should return a recorder instance.", func() {
47-
provider, err := recorder.NewProvider(cfg, scheme.Scheme, tlog.NullLogger{}, record.NewBroadcaster())
48+
provider, err := recorder.NewProvider(cfg, scheme.Scheme, tlog.NullLogger{}, makeBroadcaster)
4849
Expect(err).NotTo(HaveOccurred())
4950

5051
recorder := provider.GetEventRecorderFor("test")

pkg/manager/internal.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ import (
3939
"sigs.k8s.io/controller-runtime/pkg/client"
4040
"sigs.k8s.io/controller-runtime/pkg/healthz"
4141
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
42+
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
4243
"sigs.k8s.io/controller-runtime/pkg/metrics"
43-
"sigs.k8s.io/controller-runtime/pkg/recorder"
4444
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
4545
"sigs.k8s.io/controller-runtime/pkg/webhook"
4646
)
@@ -89,7 +89,7 @@ type controllerManager struct {
8989

9090
// recorderProvider is used to generate event recorders that will be injected into Controllers
9191
// (and EventHandlers, Sources and Predicates).
92-
recorderProvider recorder.Provider
92+
recorderProvider *intrec.Provider
9393

9494
// resourceLock forms the basis for leader election
9595
resourceLock resourcelock.Interface
@@ -535,6 +535,12 @@ func (cm *controllerManager) engageStopProcedure(stopComplete chan struct{}) err
535535
cm.mu.Lock()
536536
defer cm.mu.Unlock()
537537
cm.stopProcedureEngaged = true
538+
539+
// we want to close this after the other runnables stop, because we don't
540+
// want things like leader election to try and emit events on a closed
541+
// channel
542+
defer cm.recorderProvider.Stop(cm.shutdownCtx)
543+
538544
return cm.waitForRunnableToEnd(cm.shutdownCtx, cancel)
539545
}
540546

pkg/manager/manager.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import (
3333
"sigs.k8s.io/controller-runtime/pkg/client"
3434
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
3535
"sigs.k8s.io/controller-runtime/pkg/healthz"
36-
internalrecorder "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
36+
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
3737
"sigs.k8s.io/controller-runtime/pkg/leaderelection"
3838
logf "sigs.k8s.io/controller-runtime/pkg/log"
3939
"sigs.k8s.io/controller-runtime/pkg/metrics"
@@ -224,6 +224,9 @@ type Options struct {
224224

225225
// EventBroadcaster records Events emitted by the manager and sends them to the Kubernetes API
226226
// Use this to customize the event correlator and spam filter
227+
//
228+
// Deprecated: using this may cause goroutine leaks if the lifetime of your manager or controllers
229+
// is shorter than the lifetime of your process.
227230
EventBroadcaster record.EventBroadcaster
228231

229232
// GracefulShutdownTimeout is the duration given to runnable to stop before the manager actually returns on stop.
@@ -232,8 +235,14 @@ type Options struct {
232235
// The graceful shutdown is skipped for safety reasons in case the leadere election lease is lost.
233236
GracefulShutdownTimeout *time.Duration
234237

238+
// makeBroadcaster allows deferring the creation of the broadcaster to
239+
// avoid leaking goroutines if we never call Start on this manager. It also
240+
// returns whether or not this is a "owned" broadcaster, and as such should be
241+
// stopped with the manager.
242+
makeBroadcaster intrec.EventBroadcasterProducer
243+
235244
// Dependency injection for testing
236-
newRecorderProvider func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, broadcaster record.EventBroadcaster) (recorder.Provider, error)
245+
newRecorderProvider func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster intrec.EventBroadcasterProducer) (*intrec.Provider, error)
237246
newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error)
238247
newMetricsListener func(addr string) (net.Listener, error)
239248
newHealthProbeListener func(addr string) (net.Listener, error)
@@ -309,7 +318,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
309318
// Create the recorder provider to inject event recorders for the components.
310319
// TODO(directxman12): the log for the event provider should have a context (name, tags, etc) specific
311320
// to the particular controller that it's being injected into, rather than a generic one like is here.
312-
recorderProvider, err := options.newRecorderProvider(config, options.Scheme, log.WithName("events"), options.EventBroadcaster)
321+
recorderProvider, err := options.newRecorderProvider(config, options.Scheme, log.WithName("events"), options.makeBroadcaster)
313322
if err != nil {
314323
return nil, err
315324
}
@@ -428,7 +437,7 @@ func setOptionsDefaults(options Options) Options {
428437

429438
// Allow newRecorderProvider to be mocked
430439
if options.newRecorderProvider == nil {
431-
options.newRecorderProvider = internalrecorder.NewProvider
440+
options.newRecorderProvider = intrec.NewProvider
432441
}
433442

434443
// Allow newResourceLock to be mocked
@@ -453,7 +462,14 @@ func setOptionsDefaults(options Options) Options {
453462
}
454463

455464
if options.EventBroadcaster == nil {
456-
options.EventBroadcaster = record.NewBroadcaster()
465+
// defer initialization to avoid leaking by default
466+
options.makeBroadcaster = func() (record.EventBroadcaster, bool) {
467+
return record.NewBroadcaster(), true
468+
}
469+
} else {
470+
options.makeBroadcaster = func() (record.EventBroadcaster, bool) {
471+
return options.EventBroadcaster, false
472+
}
457473
}
458474

459475
if options.ReadinessEndpointName == "" {

0 commit comments

Comments
 (0)