Skip to content

Commit 72a4107

Browse files
committed
🐛 Address review comments and fix metrics in priority queue
This change addresses Stefans review comments from the original PR and fixes a bug in the metrics where we always included items that are not ready yet - This is incorrect, metrics are only implemented on the [basequeue][0] in upstream, meaning they are only emitted for items that are ready. The impact of this was for example an incorrect queue_depth metric. [0]: https://github.com/kubernetes/kubernetes/blob/b1cb471982b74c13c26dbcc0f4e1b5ae92ea47e6/staging/src/k8s.io/client-go/util/workqueue/queue.go#L157
1 parent a9b7c2d commit 72a4107

File tree

9 files changed

+123
-36
lines changed

9 files changed

+123
-36
lines changed

examples/priorityqueue/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"time"
2424

2525
corev1 "k8s.io/api/core/v1"
26+
"k8s.io/utils/ptr"
2627
"sigs.k8s.io/controller-runtime/pkg/builder"
2728
kubeconfig "sigs.k8s.io/controller-runtime/pkg/client/config"
2829
"sigs.k8s.io/controller-runtime/pkg/config"
@@ -48,7 +49,7 @@ func run() error {
4849

4950
// Setup a Manager
5051
mgr, err := manager.New(kubeconfig.GetConfigOrDie(), manager.Options{
51-
Controller: config.Controller{UsePriorityQueue: true},
52+
Controller: config.Controller{UsePriorityQueue: ptr.To(true)},
5253
})
5354
if err != nil {
5455
return fmt.Errorf("failed to set up controller-manager: %w", err)

pkg/config/controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,5 +58,5 @@ type Controller struct {
5858
// priority queue.
5959
//
6060
// Note: This flag is disabled by default until a future version. It's currently in beta.
61-
UsePriorityQueue bool
61+
UsePriorityQueue *bool
6262
}

pkg/controller/controller.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/go-logr/logr"
2525
"k8s.io/client-go/util/workqueue"
2626
"k8s.io/klog/v2"
27+
"k8s.io/utils/ptr"
2728

2829
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
2930
"sigs.k8s.io/controller-runtime/pkg/internal/controller"
@@ -190,7 +191,7 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt
190191
}
191192

192193
if options.RateLimiter == nil {
193-
if mgr.GetControllerOptions().UsePriorityQueue {
194+
if ptr.Deref(mgr.GetControllerOptions().UsePriorityQueue, false) {
194195
options.RateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[request](5*time.Millisecond, 1000*time.Second)
195196
} else {
196197
options.RateLimiter = workqueue.DefaultTypedControllerRateLimiter[request]()
@@ -199,7 +200,7 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt
199200

200201
if options.NewQueue == nil {
201202
options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] {
202-
if mgr.GetControllerOptions().UsePriorityQueue {
203+
if ptr.Deref(mgr.GetControllerOptions().UsePriorityQueue, false) {
203204
return priorityqueue.New(controllerName, func(o *priorityqueue.Opts[request]) {
204205
o.RateLimiter = rateLimiter
205206
})

pkg/controller/controller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ var _ = Describe("controller.Controller", func() {
441441

442442
It("should configure a priority queue if UsePriorityQueue is set", func() {
443443
m, err := manager.New(cfg, manager.Options{
444-
Controller: config.Controller{UsePriorityQueue: true},
444+
Controller: config.Controller{UsePriorityQueue: ptr.To(true)},
445445
})
446446
Expect(err).NotTo(HaveOccurred())
447447

pkg/controller/priorityqueue/metrics.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ type defaultQueueMetrics[T comparable] struct {
6666
retries workqueue.CounterMetric
6767
}
6868

69+
// add is called for ready items only
6970
func (m *defaultQueueMetrics[T]) add(item T) {
7071
if m == nil {
7172
return

pkg/controller/priorityqueue/priorityqueue.go

Lines changed: 47 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,10 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
5757
}
5858

5959
pq := &priorityqueue[T]{
60-
items: map[T]*item[T]{},
61-
queue: btree.NewG(32, less[T]),
62-
metrics: newQueueMetrics[T](opts.MetricProvider, name, clock.RealClock{}),
60+
items: map[T]*item[T]{},
61+
queue: btree.NewG(32, less[T]),
62+
becameReady: sets.Set[T]{},
63+
metrics: newQueueMetrics[T](opts.MetricProvider, name, clock.RealClock{}),
6364
// itemOrWaiterAdded indicates that an item or
6465
// waiter was added. It must be buffered, because
6566
// if we currently process items we can't tell
@@ -83,16 +84,21 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
8384

8485
type priorityqueue[T comparable] struct {
8586
// lock has to be acquired for any access any of items, queue, addedCounter
86-
// or metrics.
87-
lock sync.Mutex
88-
items map[T]*item[T]
89-
queue bTree[*item[T]]
90-
metrics queueMetrics[T]
87+
// or becameReady
88+
lock sync.Mutex
89+
items map[T]*item[T]
90+
queue bTree[*item[T]]
9191

9292
// addedCounter is a counter of elements added, we need it
9393
// because unixNano is not guaranteed to be unique.
9494
addedCounter uint64
9595

96+
// becameReady holds items that are in the queue, were added
97+
// with non-zero after and became ready. We need it to call the
98+
// metrics add exactly once for them.
99+
becameReady sets.Set[T]
100+
metrics queueMetrics[T]
101+
96102
itemOrWaiterAdded chan struct{}
97103

98104
rateLimiter workqueue.TypedRateLimiter[T]
@@ -142,7 +148,9 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
142148
}
143149
w.items[key] = item
144150
w.queue.ReplaceOrInsert(item)
145-
w.metrics.add(key)
151+
if item.readyAt == nil {
152+
w.metrics.add(key)
153+
}
146154
w.addedCounter++
147155
continue
148156
}
@@ -196,18 +204,21 @@ func (w *priorityqueue[T]) spin() {
196204
defer w.lockedLock.Unlock()
197205

198206
w.queue.Ascend(func(item *item[T]) bool {
199-
if w.waiters.Load() == 0 { // no waiters, return as we can not hand anything out anyways
200-
return false
207+
if item.readyAt != nil {
208+
if readyAt := item.readyAt.Sub(w.now()); readyAt > 0 {
209+
nextReady = w.tick(readyAt)
210+
return false
211+
}
212+
if !w.becameReady.Has(item.key) {
213+
w.metrics.add(item.key)
214+
w.becameReady.Insert(item.key)
215+
}
201216
}
202217

203-
// No next element we can process
204-
if item.readyAt != nil && item.readyAt.After(w.now()) {
205-
readyAt := item.readyAt.Sub(w.now())
206-
if readyAt <= 0 { // Toctou race with the above check
207-
readyAt = 1
208-
}
209-
nextReady = w.tick(readyAt)
210-
return false
218+
if w.waiters.Load() == 0 {
219+
// Have to keep iterating here to ensure we update metrics
220+
// for further items that became ready and set nextReady.
221+
return true
211222
}
212223

213224
// Item is locked, we can not hand it out
@@ -220,6 +231,7 @@ func (w *priorityqueue[T]) spin() {
220231
w.waiters.Add(-1)
221232
delete(w.items, item.key)
222233
w.queue.Delete(item)
234+
w.becameReady.Delete(item.key)
223235
w.get <- *item
224236

225237
return true
@@ -279,22 +291,36 @@ func (w *priorityqueue[T]) ShutDown() {
279291
close(w.done)
280292
}
281293

294+
// ShutDownWithDrain just calls ShutDown, as the draining
295+
// functionality is not used by controller-runtime.
282296
func (w *priorityqueue[T]) ShutDownWithDrain() {
283297
w.ShutDown()
284298
}
285299

300+
// Len returns the number of items that are ready to be
301+
// picked up. It does not include items that are not yet
302+
// ready.
286303
func (w *priorityqueue[T]) Len() int {
287304
w.lock.Lock()
288305
defer w.lock.Unlock()
289306

290-
return w.queue.Len()
307+
var result int
308+
w.queue.Ascend(func(item *item[T]) bool {
309+
if item.readyAt == nil || item.readyAt.Compare(w.now()) <= 0 {
310+
result++
311+
return true
312+
}
313+
return false
314+
})
315+
316+
return result
291317
}
292318

293319
func less[T comparable](a, b *item[T]) bool {
294320
if a.readyAt == nil && b.readyAt != nil {
295321
return true
296322
}
297-
if a.readyAt != nil && b.readyAt == nil {
323+
if b.readyAt == nil && a.readyAt != nil {
298324
return false
299325
}
300326
if a.readyAt != nil && b.readyAt != nil && !a.readyAt.Equal(*b.readyAt) {
@@ -329,5 +355,4 @@ type bTree[T any] interface {
329355
ReplaceOrInsert(item T) (_ T, _ bool)
330356
Delete(item T) (T, bool)
331357
Ascend(iterator btree.ItemIteratorG[T])
332-
Len() int
333358
}

pkg/controller/priorityqueue/priorityqueue_test.go

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,67 @@ var _ = Describe("Controllerworkqueue", func() {
283283
Expect(metrics.depth["test"]).To(Equal(0))
284284
Expect(metrics.adds["test"]).To(Equal(2))
285285
})
286+
287+
It("doesn't include non-ready items in Len()", func() {
288+
q, metrics := newQueue()
289+
defer q.ShutDown()
290+
291+
q.AddWithOpts(AddOpts{After: time.Minute}, "foo")
292+
q.AddWithOpts(AddOpts{}, "baz")
293+
q.AddWithOpts(AddOpts{After: time.Minute}, "bar")
294+
q.AddWithOpts(AddOpts{}, "bal")
295+
296+
Expect(q.Len()).To(Equal(2))
297+
Expect(metrics.depth).To(HaveLen(1))
298+
Expect(metrics.depth["test"]).To(Equal(2))
299+
})
300+
301+
It("items are included in Len() and the queueDepth metric once they are ready", func() {
302+
q, metrics := newQueue()
303+
defer q.ShutDown()
304+
305+
q.AddWithOpts(AddOpts{After: 500 * time.Millisecond}, "foo")
306+
q.AddWithOpts(AddOpts{}, "baz")
307+
q.AddWithOpts(AddOpts{After: 500 * time.Millisecond}, "bar")
308+
q.AddWithOpts(AddOpts{}, "bal")
309+
310+
Expect(q.Len()).To(Equal(2))
311+
metrics.mu.Lock()
312+
Expect(metrics.depth["test"]).To(Equal(2))
313+
metrics.mu.Unlock()
314+
time.Sleep(time.Second)
315+
Expect(q.Len()).To(Equal(4))
316+
metrics.mu.Lock()
317+
Expect(metrics.depth["test"]).To(Equal(4))
318+
metrics.mu.Unlock()
319+
320+
// Drain queue
321+
for range 4 {
322+
item, _ := q.Get()
323+
q.Done(item)
324+
}
325+
Expect(q.Len()).To(Equal(0))
326+
metrics.mu.Lock()
327+
Expect(metrics.depth["test"]).To(Equal(0))
328+
metrics.mu.Unlock()
329+
330+
// Validate that doing it again still works to notice bugs with removing
331+
// it from the queues becameReady tracking.
332+
q.AddWithOpts(AddOpts{After: 500 * time.Millisecond}, "foo")
333+
q.AddWithOpts(AddOpts{}, "baz")
334+
q.AddWithOpts(AddOpts{After: 500 * time.Millisecond}, "bar")
335+
q.AddWithOpts(AddOpts{}, "bal")
336+
337+
Expect(q.Len()).To(Equal(2))
338+
metrics.mu.Lock()
339+
Expect(metrics.depth["test"]).To(Equal(2))
340+
metrics.mu.Unlock()
341+
time.Sleep(time.Second)
342+
Expect(q.Len()).To(Equal(4))
343+
metrics.mu.Lock()
344+
Expect(metrics.depth["test"]).To(Equal(4))
345+
metrics.mu.Unlock()
346+
})
286347
})
287348

288349
func BenchmarkAddGetDone(b *testing.B) {
@@ -438,10 +499,6 @@ func TestFuzzPrioriorityQueue(t *testing.T) {
438499
}
439500

440501
wg.Wait()
441-
442-
if expected := len(inQueue); expected != q.Len() {
443-
t.Errorf("Expected queue length to be %d, was %d", expected, q.Len())
444-
}
445502
}
446503

447504
func newQueue() (PriorityQueue[string], *fakeMetricsProvider) {
@@ -453,6 +510,8 @@ func newQueue() (PriorityQueue[string], *fakeMetricsProvider) {
453510
bTree: q.(*priorityqueue[string]).queue,
454511
}
455512

513+
// validate that tick always gets a positive value as it will just return
514+
// nil otherwise, which results in blocking forever.
456515
upstreamTick := q.(*priorityqueue[string]).tick
457516
q.(*priorityqueue[string]).tick = func(d time.Duration) <-chan time.Time {
458517
if d <= 0 {
@@ -477,7 +536,7 @@ func (b *btreeInteractionValidator) ReplaceOrInsert(item *item[string]) (*item[s
477536
}
478537

479538
func (b *btreeInteractionValidator) Delete(item *item[string]) (*item[string], bool) {
480-
// There is node codepath that deletes an item that doesn't exist
539+
// There is no codepath that deletes an item that doesn't exist
481540
old, existed := b.bTree.Delete(item)
482541
if !existed {
483542
panic(fmt.Sprintf("Delete: item %v not found", item))

pkg/handler/eventhandler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,8 @@ func (w workqueueWithCustomAddFunc[request]) Add(item request) {
194194
}
195195

196196
// isObjectUnchanged checks if the object in a create event is unchanged, for example because
197-
// we got it in our initial listwatch or because of a resync. The heuristic it uses is to check
198-
// if the object is older than one minute.
197+
// we got it in our initial listwatch. The heuristic it uses is to check if the object is older
198+
// than one minute.
199199
func isObjectUnchanged[object client.Object](e event.TypedCreateEvent[object]) bool {
200200
return e.Object.GetCreationTimestamp().Time.Before(time.Now().Add(-time.Minute))
201201
}

pkg/handler/eventhandler_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -776,7 +776,7 @@ var _ = Describe("Eventhandler", func() {
776776
})
777777

778778
Describe("WithLowPriorityWhenUnchanged", func() {
779-
It("should lower the priority of a create request for an object that was crated more than one minute in the past", func() {
779+
It("should lower the priority of a create request for an object that was created more than one minute in the past", func() {
780780
actualOpts := priorityqueue.AddOpts{}
781781
var actualRequests []reconcile.Request
782782
wq := &fakePriorityQueue{
@@ -797,7 +797,7 @@ var _ = Describe("Eventhandler", func() {
797797
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
798798
})
799799

800-
It("should not lower the priority of a create request for an object that was crated less than one minute in the past", func() {
800+
It("should not lower the priority of a create request for an object that was created less than one minute in the past", func() {
801801
actualOpts := priorityqueue.AddOpts{}
802802
var actualRequests []reconcile.Request
803803
wq := &fakePriorityQueue{

0 commit comments

Comments
 (0)