Skip to content

Commit aca66a0

Browse files
committed
🐛 Controller: Wait for all reconciliations before shutting down
Currently, the controller will instantly shutdown and return when its context gets cancelled, leaving active reconciliations be. This change makes it wait for those before finishing shutdown.
1 parent b125a18 commit aca66a0

File tree

3 files changed

+46
-20
lines changed

3 files changed

+46
-20
lines changed

pkg/controller/controller_test.go

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,21 @@ package controller_test
1919
import (
2020
"context"
2121
"fmt"
22+
"time"
2223

2324
. "github.com/onsi/ginkgo"
2425
. "github.com/onsi/gomega"
2526
"go.uber.org/goleak"
27+
corev1 "k8s.io/api/core/v1"
2628

2729
"sigs.k8s.io/controller-runtime/pkg/client"
2830
"sigs.k8s.io/controller-runtime/pkg/controller"
31+
"sigs.k8s.io/controller-runtime/pkg/event"
32+
"sigs.k8s.io/controller-runtime/pkg/handler"
2933
"sigs.k8s.io/controller-runtime/pkg/manager"
3034
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3135
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
36+
"sigs.k8s.io/controller-runtime/pkg/source"
3237
)
3338

3439
var _ = Describe("controller.Controller", func() {
@@ -88,15 +93,42 @@ var _ = Describe("controller.Controller", func() {
8893
It("should not leak goroutines when stopped", func() {
8994
currentGRs := goleak.IgnoreCurrent()
9095

96+
watchChan := make(chan event.GenericEvent, 1)
97+
watch := &source.Channel{Source: watchChan}
98+
watchChan <- event.GenericEvent{Object: &corev1.Pod{}}
99+
100+
reconcileStarted := make(chan struct{})
101+
controllerFinished := make(chan struct{})
102+
rec := reconcile.Func(func(context.Context, reconcile.Request) (reconcile.Result, error) {
103+
defer GinkgoRecover()
104+
close(reconcileStarted)
105+
// Make sure reconciliation takes a moment and is not quicker than the controllers
106+
// shutdown.
107+
time.Sleep(50 * time.Millisecond)
108+
// Explicitly test this on top of the leakdetection, as the latter uses Eventually
109+
// so might succeed even when the controller does not wait for all reconciliations
110+
// to finish.
111+
Expect(controllerFinished).NotTo(BeClosed())
112+
return reconcile.Result{}, nil
113+
})
114+
91115
m, err := manager.New(cfg, manager.Options{})
92116
Expect(err).NotTo(HaveOccurred())
93117

94-
_, err = controller.New("new-controller", m, controller.Options{Reconciler: rec})
118+
c, err := controller.New("new-controller", m, controller.Options{Reconciler: rec})
119+
Expect(c.Watch(watch, &handler.EnqueueRequestForObject{})).To(Succeed())
95120
Expect(err).NotTo(HaveOccurred())
96121

97122
ctx, cancel := context.WithCancel(context.Background())
123+
go func() {
124+
defer GinkgoRecover()
125+
Expect(m.Start(ctx)).To(Succeed())
126+
close(controllerFinished)
127+
}()
128+
129+
<-reconcileStarted
98130
cancel()
99-
Expect(m.Start(ctx)).To(Succeed())
131+
<-controllerFinished
100132

101133
// force-close keep-alive connections. These'll time anyway (after
102134
// like 30s or so) but force it to speed up the tests.

pkg/internal/controller/controller.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525

2626
"github.com/go-logr/logr"
2727
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
28-
"k8s.io/apimachinery/pkg/util/wait"
2928
"k8s.io/client-go/util/workqueue"
3029
"sigs.k8s.io/controller-runtime/pkg/handler"
3130
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
@@ -67,9 +66,6 @@ type Controller struct {
6766
// mu is used to synchronize Controller setup
6867
mu sync.Mutex
6968

70-
// JitterPeriod allows tests to reduce the JitterPeriod so they complete faster
71-
JitterPeriod time.Duration
72-
7369
// Started is true if the Controller has been Started
7470
Started bool
7571

@@ -150,8 +146,12 @@ func (c *Controller) Start(ctx context.Context) error {
150146
c.ctx = ctx
151147

152148
c.Queue = c.MakeQueue()
153-
defer c.Queue.ShutDown() // needs to be outside the iife so that we shutdown after the stop channel is closed
149+
go func() {
150+
<-ctx.Done()
151+
c.Queue.ShutDown()
152+
}()
154153

154+
wg := &sync.WaitGroup{}
155155
err := func() error {
156156
defer c.mu.Unlock()
157157

@@ -203,19 +203,17 @@ func (c *Controller) Start(ctx context.Context) error {
203203
// which won't be garbage collected if we hold a reference to it.
204204
c.startWatches = nil
205205

206-
if c.JitterPeriod == 0 {
207-
c.JitterPeriod = 1 * time.Second
208-
}
209-
210206
// Launch workers to process resources
211207
c.Log.Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
208+
wg.Add(c.MaxConcurrentReconciles)
212209
for i := 0; i < c.MaxConcurrentReconciles; i++ {
213-
go wait.UntilWithContext(ctx, func(ctx context.Context) {
210+
go func() {
211+
defer wg.Done()
214212
// Run a worker thread that just dequeues items, processes them, and marks them done.
215213
// It enforces that the reconcileHandler is never invoked concurrently with the same object.
216214
for c.processNextWorkItem(ctx) {
217215
}
218-
}, c.JitterPeriod)
216+
}()
219217
}
220218

221219
c.Started = true
@@ -226,7 +224,9 @@ func (c *Controller) Start(ctx context.Context) error {
226224
}
227225

228226
<-ctx.Done()
229-
c.Log.Info("Stopping workers")
227+
c.Log.Info("Shutdown signal received, waiting for all workers to finish")
228+
wg.Wait()
229+
c.Log.Info("All workers finished")
230230
return nil
231231
}
232232

pkg/internal/controller/controller_test.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -469,8 +469,6 @@ var _ = Describe("controller", func() {
469469
})
470470

471471
It("should requeue a Request if there is an error and continue processing items", func(done Done) {
472-
// Reduce the jitterperiod so we don't have to wait a second before the reconcile function is rerun.
473-
ctrl.JitterPeriod = time.Millisecond
474472

475473
ctx, cancel := context.WithCancel(context.Background())
476474
defer cancel()
@@ -597,7 +595,6 @@ var _ = Describe("controller", func() {
597595
It("should perform error behavior if error is not nil, regardless of RequeueAfter", func() {
598596
dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()}
599597
ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq }
600-
ctrl.JitterPeriod = time.Millisecond
601598

602599
ctx, cancel := context.WithCancel(context.Background())
603600
defer cancel()
@@ -790,9 +787,6 @@ var _ = Describe("controller", func() {
790787
}()
791788
queue.Add(request)
792789

793-
// Reduce the jitterperiod so we don't have to wait a second before the reconcile function is rerun.
794-
ctrl.JitterPeriod = time.Millisecond
795-
796790
By("Invoking Reconciler which will give an error")
797791
fakeReconcile.AddResult(reconcile.Result{}, fmt.Errorf("expected error: reconcile"))
798792
Expect(<-reconciled).To(Equal(request))

0 commit comments

Comments
 (0)