Skip to content

Commit df2c43d

Browse files
authored
Merge pull request #1427 from alvaroaleman/fix-wait
🐛 Controller: Wait for all reconciliations before shutting down
2 parents 253f275 + aca66a0 commit df2c43d

File tree

3 files changed

+46
-20
lines changed

3 files changed

+46
-20
lines changed

pkg/controller/controller_test.go

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,21 @@ package controller_test
1919
import (
2020
"context"
2121
"fmt"
22+
"time"
2223

2324
. "github.com/onsi/ginkgo"
2425
. "github.com/onsi/gomega"
2526
"go.uber.org/goleak"
27+
corev1 "k8s.io/api/core/v1"
2628

2729
"sigs.k8s.io/controller-runtime/pkg/client"
2830
"sigs.k8s.io/controller-runtime/pkg/controller"
31+
"sigs.k8s.io/controller-runtime/pkg/event"
32+
"sigs.k8s.io/controller-runtime/pkg/handler"
2933
"sigs.k8s.io/controller-runtime/pkg/manager"
3034
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3135
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
36+
"sigs.k8s.io/controller-runtime/pkg/source"
3237
)
3338

3439
var _ = Describe("controller.Controller", func() {
@@ -88,15 +93,42 @@ var _ = Describe("controller.Controller", func() {
8893
It("should not leak goroutines when stopped", func() {
8994
currentGRs := goleak.IgnoreCurrent()
9095

96+
watchChan := make(chan event.GenericEvent, 1)
97+
watch := &source.Channel{Source: watchChan}
98+
watchChan <- event.GenericEvent{Object: &corev1.Pod{}}
99+
100+
reconcileStarted := make(chan struct{})
101+
controllerFinished := make(chan struct{})
102+
rec := reconcile.Func(func(context.Context, reconcile.Request) (reconcile.Result, error) {
103+
defer GinkgoRecover()
104+
close(reconcileStarted)
105+
// Make sure reconciliation takes a moment and is not quicker than the controllers
106+
// shutdown.
107+
time.Sleep(50 * time.Millisecond)
108+
// Explicitly test this on top of the leakdetection, as the latter uses Eventually
109+
// so might succeed even when the controller does not wait for all reconciliations
110+
// to finish.
111+
Expect(controllerFinished).NotTo(BeClosed())
112+
return reconcile.Result{}, nil
113+
})
114+
91115
m, err := manager.New(cfg, manager.Options{})
92116
Expect(err).NotTo(HaveOccurred())
93117

94-
_, err = controller.New("new-controller", m, controller.Options{Reconciler: rec})
118+
c, err := controller.New("new-controller", m, controller.Options{Reconciler: rec})
119+
Expect(c.Watch(watch, &handler.EnqueueRequestForObject{})).To(Succeed())
95120
Expect(err).NotTo(HaveOccurred())
96121

97122
ctx, cancel := context.WithCancel(context.Background())
123+
go func() {
124+
defer GinkgoRecover()
125+
Expect(m.Start(ctx)).To(Succeed())
126+
close(controllerFinished)
127+
}()
128+
129+
<-reconcileStarted
98130
cancel()
99-
Expect(m.Start(ctx)).To(Succeed())
131+
<-controllerFinished
100132

101133
// force-close keep-alive connections. These'll time anyway (after
102134
// like 30s or so) but force it to speed up the tests.

pkg/internal/controller/controller.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525

2626
"github.com/go-logr/logr"
2727
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
28-
"k8s.io/apimachinery/pkg/util/wait"
2928
"k8s.io/client-go/util/workqueue"
3029
"sigs.k8s.io/controller-runtime/pkg/handler"
3130
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
@@ -67,9 +66,6 @@ type Controller struct {
6766
// mu is used to synchronize Controller setup
6867
mu sync.Mutex
6968

70-
// JitterPeriod allows tests to reduce the JitterPeriod so they complete faster
71-
JitterPeriod time.Duration
72-
7369
// Started is true if the Controller has been Started
7470
Started bool
7571

@@ -150,8 +146,12 @@ func (c *Controller) Start(ctx context.Context) error {
150146
c.ctx = ctx
151147

152148
c.Queue = c.MakeQueue()
153-
defer c.Queue.ShutDown() // needs to be outside the iife so that we shutdown after the stop channel is closed
149+
go func() {
150+
<-ctx.Done()
151+
c.Queue.ShutDown()
152+
}()
154153

154+
wg := &sync.WaitGroup{}
155155
err := func() error {
156156
defer c.mu.Unlock()
157157

@@ -203,19 +203,17 @@ func (c *Controller) Start(ctx context.Context) error {
203203
// which won't be garbage collected if we hold a reference to it.
204204
c.startWatches = nil
205205

206-
if c.JitterPeriod == 0 {
207-
c.JitterPeriod = 1 * time.Second
208-
}
209-
210206
// Launch workers to process resources
211207
c.Log.Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
208+
wg.Add(c.MaxConcurrentReconciles)
212209
for i := 0; i < c.MaxConcurrentReconciles; i++ {
213-
go wait.UntilWithContext(ctx, func(ctx context.Context) {
210+
go func() {
211+
defer wg.Done()
214212
// Run a worker thread that just dequeues items, processes them, and marks them done.
215213
// It enforces that the reconcileHandler is never invoked concurrently with the same object.
216214
for c.processNextWorkItem(ctx) {
217215
}
218-
}, c.JitterPeriod)
216+
}()
219217
}
220218

221219
c.Started = true
@@ -226,7 +224,9 @@ func (c *Controller) Start(ctx context.Context) error {
226224
}
227225

228226
<-ctx.Done()
229-
c.Log.Info("Stopping workers")
227+
c.Log.Info("Shutdown signal received, waiting for all workers to finish")
228+
wg.Wait()
229+
c.Log.Info("All workers finished")
230230
return nil
231231
}
232232

pkg/internal/controller/controller_test.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -469,8 +469,6 @@ var _ = Describe("controller", func() {
469469
})
470470

471471
It("should requeue a Request if there is an error and continue processing items", func(done Done) {
472-
// Reduce the jitterperiod so we don't have to wait a second before the reconcile function is rerun.
473-
ctrl.JitterPeriod = time.Millisecond
474472

475473
ctx, cancel := context.WithCancel(context.Background())
476474
defer cancel()
@@ -597,7 +595,6 @@ var _ = Describe("controller", func() {
597595
It("should perform error behavior if error is not nil, regardless of RequeueAfter", func() {
598596
dq := &DelegatingQueue{RateLimitingInterface: ctrl.MakeQueue()}
599597
ctrl.MakeQueue = func() workqueue.RateLimitingInterface { return dq }
600-
ctrl.JitterPeriod = time.Millisecond
601598

602599
ctx, cancel := context.WithCancel(context.Background())
603600
defer cancel()
@@ -790,9 +787,6 @@ var _ = Describe("controller", func() {
790787
}()
791788
queue.Add(request)
792789

793-
// Reduce the jitterperiod so we don't have to wait a second before the reconcile function is rerun.
794-
ctrl.JitterPeriod = time.Millisecond
795-
796790
By("Invoking Reconciler which will give an error")
797791
fakeReconcile.AddResult(reconcile.Result{}, fmt.Errorf("expected error: reconcile"))
798792
Expect(<-reconciled).To(Equal(request))

0 commit comments

Comments
 (0)