Skip to content

Commit 011cd8a

Browse files
authored
Merge pull request #1089 from DirectXMan12/feature/leakless-events
Try to avoid event handling leaks
2 parents 3715e20 + b269400 commit 011cd8a

File tree

10 files changed

+243
-53
lines changed

10 files changed

+243
-53
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ require (
1515
github.com/onsi/gomega v1.10.1
1616
github.com/prometheus/client_golang v1.7.1
1717
github.com/prometheus/client_model v0.2.0
18+
go.uber.org/goleak v1.1.10
1819
go.uber.org/zap v1.15.0
1920
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e
2021
gomodules.xyz/jsonpatch/v2 v2.1.0

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ github.com/mailru/easyjson v0.7.0/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7
255255
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
256256
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
257257
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
258+
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
258259
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
259260
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 h1:I0XW9+e1XWDxdcEniV4rQAIOPUGDq67JSCiRCgGCZLI=
260261
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
@@ -374,6 +375,8 @@ go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
374375
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
375376
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
376377
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
378+
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
379+
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
377380
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
378381
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
379382
go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A=
@@ -521,6 +524,7 @@ golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtn
521524
golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
522525
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
523526
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
527+
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
524528
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
525529
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
526530
golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=

pkg/controller/controller_suite_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package controller_test
1818

1919
import (
20+
"net/http"
2021
"testing"
2122

2223
. "github.com/onsi/ginkgo"
@@ -45,6 +46,9 @@ var testenv *envtest.Environment
4546
var cfg *rest.Config
4647
var clientset *kubernetes.Clientset
4748

49+
// clientTransport is used to force-close keep-alives in tests that check for leaks
50+
var clientTransport *http.Transport
51+
4852
var _ = BeforeSuite(func(done Done) {
4953
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))
5054

@@ -64,6 +68,9 @@ var _ = BeforeSuite(func(done Done) {
6468
cfg, err = testenv.Start()
6569
Expect(err).NotTo(HaveOccurred())
6670

71+
clientTransport = &http.Transport{}
72+
cfg.Transport = clientTransport
73+
6774
clientset, err = kubernetes.NewForConfig(cfg)
6875
Expect(err).NotTo(HaveOccurred())
6976

pkg/controller/controller_test.go

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ package controller_test
1919
import (
2020
"context"
2121
"fmt"
22-
rt "runtime"
2322

2423
. "github.com/onsi/ginkgo"
2524
. "github.com/onsi/gomega"
25+
"go.uber.org/goleak"
2626

2727
"sigs.k8s.io/controller-runtime/pkg/client"
2828
"sigs.k8s.io/controller-runtime/pkg/controller"
@@ -94,27 +94,39 @@ var _ = Describe("controller.Controller", func() {
9494
close(done)
9595
})
9696

97-
// This test has been marked as pending because it has been causing lots of flakes in CI.
98-
// It should be rewritten at some point later in the future.
99-
XIt("should not leak goroutines when stop", func(done Done) {
100-
// TODO(directxman12): After closing the proper leaks on watch this must be reduced to 0
101-
// The leaks currently come from the event-related code (as in corev1.Event).
102-
threshold := 3
97+
It("should not leak goroutines when stopped", func() {
98+
currentGRs := goleak.IgnoreCurrent()
10399

104100
m, err := manager.New(cfg, manager.Options{})
105101
Expect(err).NotTo(HaveOccurred())
106102

107103
_, err = controller.New("new-controller", m, controller.Options{Reconciler: rec})
108104
Expect(err).NotTo(HaveOccurred())
109105

110-
startGoroutines := rt.NumGoroutine()
111106
s := make(chan struct{})
112107
close(s)
113108

114-
Expect(m.Start(s)).NotTo(HaveOccurred())
115-
Expect(rt.NumGoroutine() - startGoroutines).To(BeNumerically("<=", threshold))
109+
Expect(m.Start(s)).To(Succeed())
116110

117-
close(done)
111+
// force-close keep-alive connections. These'll time anyway (after
112+
// like 30s or so) but force it to speed up the tests.
113+
clientTransport.CloseIdleConnections()
114+
Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed())
115+
})
116+
117+
It("should not create goroutines if never started", func() {
118+
currentGRs := goleak.IgnoreCurrent()
119+
120+
m, err := manager.New(cfg, manager.Options{})
121+
Expect(err).NotTo(HaveOccurred())
122+
123+
_, err = controller.New("new-controller", m, controller.Options{Reconciler: rec})
124+
Expect(err).NotTo(HaveOccurred())
125+
126+
// force-close keep-alive connections. These'll time anyway (after
127+
// like 30s or so) but force it to speed up the tests.
128+
clientTransport.CloseIdleConnections()
129+
Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed())
118130
})
119131
})
120132
})

pkg/internal/recorder/recorder.go

Lines changed: 111 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ limitations under the License.
1717
package recorder
1818

1919
import (
20+
"context"
2021
"fmt"
22+
"sync"
2123

2224
"github.com/go-logr/logr"
2325
corev1 "k8s.io/api/core/v1"
@@ -26,35 +28,129 @@ import (
2628
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
2729
"k8s.io/client-go/rest"
2830
"k8s.io/client-go/tools/record"
29-
"sigs.k8s.io/controller-runtime/pkg/recorder"
3031
)
3132

32-
type provider struct {
33+
// EventBroadcasterProducer makes an event broadcaster, returning
34+
// whether or not the broadcaster should be stopped with the Provider,
35+
// or not (e.g. if it's shared, it shouldn't be stopped with the Provider).
36+
type EventBroadcasterProducer func() (caster record.EventBroadcaster, stopWithProvider bool)
37+
38+
// Provider is a recorder.Provider that records events to the k8s API server
39+
// and to a logr Logger.
40+
type Provider struct {
3341
// scheme to specify when creating a recorder
3442
scheme *runtime.Scheme
35-
// eventBroadcaster to create new recorder instance
36-
eventBroadcaster record.EventBroadcaster
3743
// logger is the logger to use when logging diagnostic event info
38-
logger logr.Logger
44+
logger logr.Logger
45+
evtClient typedcorev1.EventInterface
46+
makeBroadcaster EventBroadcasterProducer
47+
48+
broadcasterOnce sync.Once
49+
broadcaster record.EventBroadcaster
50+
stopBroadcaster bool
51+
}
52+
53+
// NB(directxman12): this manually implements Stop instead of Being a runnable because we need to
54+
// stop it *after* everything else shuts down, otherwise we'll cause panics as the leader election
55+
// code finishes up and tries to continue emitting events.
56+
57+
// Stop attempts to stop this provider, stopping the underlying broadcaster
58+
// if the broadcaster asked to be stopped. It kinda tries to honor the given
59+
// context, but the underlying broadcaster has an indefinite wait that doesn't
60+
// return until all queued events are flushed, so this may end up just returning
61+
// before the underlying wait has finished instead of cancelling the wait.
62+
// This is Very Frustrating™.
63+
func (p *Provider) Stop(shutdownCtx context.Context) {
64+
doneCh := make(chan struct{})
65+
66+
go func() {
67+
// technically, this could start the broadcaster, but practically, it's
68+
// almost certainly already been started (e.g. by leader election). We
69+
// need to invoke this to ensure that we don't inadvertently race with
70+
// an invocation of getBroadcaster.
71+
broadcaster := p.getBroadcaster()
72+
if p.stopBroadcaster {
73+
broadcaster.Shutdown()
74+
}
75+
close(doneCh)
76+
}()
77+
78+
select {
79+
case <-shutdownCtx.Done():
80+
case <-doneCh:
81+
}
82+
}
83+
84+
// getBroadcaster ensures that a broadcaster is started for this
85+
// provider, and returns it. It's threadsafe.
86+
func (p *Provider) getBroadcaster() record.EventBroadcaster {
87+
// NB(directxman12): this can technically still leak if something calls
88+
// "getBroadcaster" (i.e. Emits an Event) but never calls Start, but if we
89+
// create the broadcaster in start, we could race with other things that
90+
// are started at the same time & want to emit events. The alternative is
91+
// silently swallowing events and more locking, but that seems suboptimal.
92+
93+
p.broadcasterOnce.Do(func() {
94+
broadcaster, stop := p.makeBroadcaster()
95+
broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: p.evtClient})
96+
broadcaster.StartEventWatcher(
97+
func(e *corev1.Event) {
98+
p.logger.V(1).Info(e.Type, "object", e.InvolvedObject, "reason", e.Reason, "message", e.Message)
99+
})
100+
p.broadcaster = broadcaster
101+
p.stopBroadcaster = stop
102+
})
103+
104+
return p.broadcaster
39105
}
40106

41107
// NewProvider create a new Provider instance.
42-
func NewProvider(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, broadcaster record.EventBroadcaster) (recorder.Provider, error) {
108+
func NewProvider(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster EventBroadcasterProducer) (*Provider, error) {
43109
clientSet, err := kubernetes.NewForConfig(config)
44110
if err != nil {
45111
return nil, fmt.Errorf("failed to init clientSet: %w", err)
46112
}
47113

48-
p := &provider{scheme: scheme, logger: logger, eventBroadcaster: broadcaster}
49-
p.eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientSet.CoreV1().Events("")})
50-
p.eventBroadcaster.StartEventWatcher(
51-
func(e *corev1.Event) {
52-
p.logger.V(1).Info(e.Type, "object", e.InvolvedObject, "reason", e.Reason, "message", e.Message)
53-
})
54-
114+
p := &Provider{scheme: scheme, logger: logger, makeBroadcaster: makeBroadcaster, evtClient: clientSet.CoreV1().Events("")}
55115
return p, nil
56116
}
57117

58-
func (p *provider) GetEventRecorderFor(name string) record.EventRecorder {
59-
return p.eventBroadcaster.NewRecorder(p.scheme, corev1.EventSource{Component: name})
118+
// GetEventRecorderFor returns an event recorder that broadcasts to this provider's
119+
// broadcaster. All events will be associated with a component of the given name.
120+
func (p *Provider) GetEventRecorderFor(name string) record.EventRecorder {
121+
return &lazyRecorder{
122+
prov: p,
123+
name: name,
124+
}
125+
}
126+
127+
// lazyRecorder is a recorder that doesn't actually instantiate any underlying
128+
// recorder until the first event is emitted.
129+
type lazyRecorder struct {
130+
prov *Provider
131+
name string
132+
133+
recOnce sync.Once
134+
rec record.EventRecorder
135+
}
136+
137+
// ensureRecording ensures that a concrete recorder is populated for this recorder.
138+
func (l *lazyRecorder) ensureRecording() {
139+
l.recOnce.Do(func() {
140+
broadcaster := l.prov.getBroadcaster()
141+
l.rec = broadcaster.NewRecorder(l.prov.scheme, corev1.EventSource{Component: l.name})
142+
})
143+
}
144+
145+
func (l *lazyRecorder) Event(object runtime.Object, eventtype, reason, message string) {
146+
l.ensureRecording()
147+
l.rec.Event(object, eventtype, reason, message)
148+
}
149+
func (l *lazyRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
150+
l.ensureRecording()
151+
l.rec.Eventf(object, eventtype, reason, messageFmt, args...)
152+
}
153+
func (l *lazyRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
154+
l.ensureRecording()
155+
l.rec.AnnotatedEventf(object, annotations, eventtype, reason, messageFmt, args...)
60156
}

pkg/internal/recorder/recorder_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@ import (
2626
)
2727

2828
var _ = Describe("recorder.Provider", func() {
29+
makeBroadcaster := func() (record.EventBroadcaster, bool) { return record.NewBroadcaster(), true }
2930
Describe("NewProvider", func() {
3031
It("should return a provider instance and a nil error.", func() {
31-
provider, err := recorder.NewProvider(cfg, scheme.Scheme, tlog.NullLogger{}, record.NewBroadcaster())
32+
provider, err := recorder.NewProvider(cfg, scheme.Scheme, tlog.NullLogger{}, makeBroadcaster)
3233
Expect(provider).NotTo(BeNil())
3334
Expect(err).NotTo(HaveOccurred())
3435
})
@@ -37,14 +38,14 @@ var _ = Describe("recorder.Provider", func() {
3738
// Invalid the config
3839
cfg1 := *cfg
3940
cfg1.Host = "invalid host"
40-
_, err := recorder.NewProvider(&cfg1, scheme.Scheme, tlog.NullLogger{}, record.NewBroadcaster())
41+
_, err := recorder.NewProvider(&cfg1, scheme.Scheme, tlog.NullLogger{}, makeBroadcaster)
4142
Expect(err).NotTo(BeNil())
4243
Expect(err.Error()).To(ContainSubstring("failed to init clientSet"))
4344
})
4445
})
4546
Describe("GetEventRecorder", func() {
4647
It("should return a recorder instance.", func() {
47-
provider, err := recorder.NewProvider(cfg, scheme.Scheme, tlog.NullLogger{}, record.NewBroadcaster())
48+
provider, err := recorder.NewProvider(cfg, scheme.Scheme, tlog.NullLogger{}, makeBroadcaster)
4849
Expect(err).NotTo(HaveOccurred())
4950

5051
recorder := provider.GetEventRecorderFor("test")

pkg/manager/internal.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ import (
3939
"sigs.k8s.io/controller-runtime/pkg/client"
4040
"sigs.k8s.io/controller-runtime/pkg/healthz"
4141
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
42+
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
4243
"sigs.k8s.io/controller-runtime/pkg/metrics"
43-
"sigs.k8s.io/controller-runtime/pkg/recorder"
4444
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
4545
"sigs.k8s.io/controller-runtime/pkg/webhook"
4646
)
@@ -89,7 +89,7 @@ type controllerManager struct {
8989

9090
// recorderProvider is used to generate event recorders that will be injected into Controllers
9191
// (and EventHandlers, Sources and Predicates).
92-
recorderProvider recorder.Provider
92+
recorderProvider *intrec.Provider
9393

9494
// resourceLock forms the basis for leader election
9595
resourceLock resourcelock.Interface
@@ -535,6 +535,12 @@ func (cm *controllerManager) engageStopProcedure(stopComplete chan struct{}) err
535535
cm.mu.Lock()
536536
defer cm.mu.Unlock()
537537
cm.stopProcedureEngaged = true
538+
539+
// we want to close this after the other runnables stop, because we don't
540+
// want things like leader election to try and emit events on a closed
541+
// channel
542+
defer cm.recorderProvider.Stop(cm.shutdownCtx)
543+
538544
return cm.waitForRunnableToEnd(cm.shutdownCtx, cancel)
539545
}
540546

0 commit comments

Comments
 (0)