Skip to content

Commit c80ea33

Browse files
authored
Merge pull request #3060 from alvaroaleman/pq-2
🐛 Bugfixes for priority queue
2 parents 1947a94 + 462341c commit c80ea33

File tree

9 files changed

+166
-37
lines changed

9 files changed

+166
-37
lines changed

examples/priorityqueue/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"time"
2424

2525
corev1 "k8s.io/api/core/v1"
26+
"k8s.io/utils/ptr"
2627
"sigs.k8s.io/controller-runtime/pkg/builder"
2728
kubeconfig "sigs.k8s.io/controller-runtime/pkg/client/config"
2829
"sigs.k8s.io/controller-runtime/pkg/config"
@@ -48,7 +49,7 @@ func run() error {
4849

4950
// Setup a Manager
5051
mgr, err := manager.New(kubeconfig.GetConfigOrDie(), manager.Options{
51-
Controller: config.Controller{UsePriorityQueue: true},
52+
Controller: config.Controller{UsePriorityQueue: ptr.To(true)},
5253
})
5354
if err != nil {
5455
return fmt.Errorf("failed to set up controller-manager: %w", err)

pkg/config/controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,5 +58,5 @@ type Controller struct {
5858
// priority queue.
5959
//
6060
// Note: This flag is disabled by default until a future version. It's currently in beta.
61-
UsePriorityQueue bool
61+
UsePriorityQueue *bool
6262
}

pkg/controller/controller.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/go-logr/logr"
2525
"k8s.io/client-go/util/workqueue"
2626
"k8s.io/klog/v2"
27+
"k8s.io/utils/ptr"
2728

2829
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
2930
"sigs.k8s.io/controller-runtime/pkg/internal/controller"
@@ -190,7 +191,7 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt
190191
}
191192

192193
if options.RateLimiter == nil {
193-
if mgr.GetControllerOptions().UsePriorityQueue {
194+
if ptr.Deref(mgr.GetControllerOptions().UsePriorityQueue, false) {
194195
options.RateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[request](5*time.Millisecond, 1000*time.Second)
195196
} else {
196197
options.RateLimiter = workqueue.DefaultTypedControllerRateLimiter[request]()
@@ -199,7 +200,7 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt
199200

200201
if options.NewQueue == nil {
201202
options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] {
202-
if mgr.GetControllerOptions().UsePriorityQueue {
203+
if ptr.Deref(mgr.GetControllerOptions().UsePriorityQueue, false) {
203204
return priorityqueue.New(controllerName, func(o *priorityqueue.Opts[request]) {
204205
o.RateLimiter = rateLimiter
205206
})

pkg/controller/controller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ var _ = Describe("controller.Controller", func() {
441441

442442
It("should configure a priority queue if UsePriorityQueue is set", func() {
443443
m, err := manager.New(cfg, manager.Options{
444-
Controller: config.Controller{UsePriorityQueue: true},
444+
Controller: config.Controller{UsePriorityQueue: ptr.To(true)},
445445
})
446446
Expect(err).NotTo(HaveOccurred())
447447

pkg/controller/priorityqueue/metrics.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ type defaultQueueMetrics[T comparable] struct {
6666
retries workqueue.CounterMetric
6767
}
6868

69+
// add is called for ready items only
6970
func (m *defaultQueueMetrics[T]) add(item T) {
7071
if m == nil {
7172
return

pkg/controller/priorityqueue/priorityqueue.go

Lines changed: 55 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,10 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
5757
}
5858

5959
pq := &priorityqueue[T]{
60-
items: map[T]*item[T]{},
61-
queue: btree.NewG(32, less[T]),
62-
metrics: newQueueMetrics[T](opts.MetricProvider, name, clock.RealClock{}),
60+
items: map[T]*item[T]{},
61+
queue: btree.NewG(32, less[T]),
62+
becameReady: sets.Set[T]{},
63+
metrics: newQueueMetrics[T](opts.MetricProvider, name, clock.RealClock{}),
6364
// itemOrWaiterAdded indicates that an item or
6465
// waiter was added. It must be buffered, because
6566
// if we currently process items we can't tell
@@ -83,16 +84,21 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
8384

8485
type priorityqueue[T comparable] struct {
8586
// lock has to be acquired for any access any of items, queue, addedCounter
86-
// or metrics.
87-
lock sync.Mutex
88-
items map[T]*item[T]
89-
queue bTree[*item[T]]
90-
metrics queueMetrics[T]
87+
// or becameReady
88+
lock sync.Mutex
89+
items map[T]*item[T]
90+
queue bTree[*item[T]]
9191

9292
// addedCounter is a counter of elements added, we need it
9393
// because unixNano is not guaranteed to be unique.
9494
addedCounter uint64
9595

96+
// becameReady holds items that are in the queue, were added
97+
// with non-zero after and became ready. We need it to call the
98+
// metrics add exactly once for them.
99+
becameReady sets.Set[T]
100+
metrics queueMetrics[T]
101+
96102
itemOrWaiterAdded chan struct{}
97103

98104
rateLimiter workqueue.TypedRateLimiter[T]
@@ -142,7 +148,9 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
142148
}
143149
w.items[key] = item
144150
w.queue.ReplaceOrInsert(item)
145-
w.metrics.add(key)
151+
if item.readyAt == nil {
152+
w.metrics.add(key)
153+
}
146154
w.addedCounter++
147155
continue
148156
}
@@ -195,19 +203,25 @@ func (w *priorityqueue[T]) spin() {
195203
w.lockedLock.Lock()
196204
defer w.lockedLock.Unlock()
197205

206+
// manipulating the tree from within Ascend might lead to panics, so
207+
// track what we want to delete and do it after we are done ascending.
208+
var toDelete []*item[T]
198209
w.queue.Ascend(func(item *item[T]) bool {
199-
if w.waiters.Load() == 0 { // no waiters, return as we can not hand anything out anyways
200-
return false
210+
if item.readyAt != nil {
211+
if readyAt := item.readyAt.Sub(w.now()); readyAt > 0 {
212+
nextReady = w.tick(readyAt)
213+
return false
214+
}
215+
if !w.becameReady.Has(item.key) {
216+
w.metrics.add(item.key)
217+
w.becameReady.Insert(item.key)
218+
}
201219
}
202220

203-
// No next element we can process
204-
if item.readyAt != nil && item.readyAt.After(w.now()) {
205-
readyAt := item.readyAt.Sub(w.now())
206-
if readyAt <= 0 { // Toctou race with the above check
207-
readyAt = 1
208-
}
209-
nextReady = w.tick(readyAt)
210-
return false
221+
if w.waiters.Load() == 0 {
222+
// Have to keep iterating here to ensure we update metrics
223+
// for further items that became ready and set nextReady.
224+
return true
211225
}
212226

213227
// Item is locked, we can not hand it out
@@ -219,11 +233,16 @@ func (w *priorityqueue[T]) spin() {
219233
w.locked.Insert(item.key)
220234
w.waiters.Add(-1)
221235
delete(w.items, item.key)
222-
w.queue.Delete(item)
236+
toDelete = append(toDelete, item)
237+
w.becameReady.Delete(item.key)
223238
w.get <- *item
224239

225240
return true
226241
})
242+
243+
for _, item := range toDelete {
244+
w.queue.Delete(item)
245+
}
227246
}()
228247
}
229248
}
@@ -279,22 +298,36 @@ func (w *priorityqueue[T]) ShutDown() {
279298
close(w.done)
280299
}
281300

301+
// ShutDownWithDrain just calls ShutDown, as the draining
302+
// functionality is not used by controller-runtime.
282303
func (w *priorityqueue[T]) ShutDownWithDrain() {
283304
w.ShutDown()
284305
}
285306

307+
// Len returns the number of items that are ready to be
308+
// picked up. It does not include items that are not yet
309+
// ready.
286310
func (w *priorityqueue[T]) Len() int {
287311
w.lock.Lock()
288312
defer w.lock.Unlock()
289313

290-
return w.queue.Len()
314+
var result int
315+
w.queue.Ascend(func(item *item[T]) bool {
316+
if item.readyAt == nil || item.readyAt.Compare(w.now()) <= 0 {
317+
result++
318+
return true
319+
}
320+
return false
321+
})
322+
323+
return result
291324
}
292325

293326
func less[T comparable](a, b *item[T]) bool {
294327
if a.readyAt == nil && b.readyAt != nil {
295328
return true
296329
}
297-
if a.readyAt != nil && b.readyAt == nil {
330+
if b.readyAt == nil && a.readyAt != nil {
298331
return false
299332
}
300333
if a.readyAt != nil && b.readyAt != nil && !a.readyAt.Equal(*b.readyAt) {
@@ -329,5 +362,4 @@ type bTree[T any] interface {
329362
ReplaceOrInsert(item T) (_ T, _ bool)
330363
Delete(item T) (T, bool)
331364
Ascend(iterator btree.ItemIteratorG[T])
332-
Len() int
333365
}

pkg/controller/priorityqueue/priorityqueue_test.go

Lines changed: 99 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package priorityqueue
22

33
import (
44
"fmt"
5+
"math/rand/v2"
56
"sync"
67
"testing"
78
"time"
@@ -283,6 +284,101 @@ var _ = Describe("Controllerworkqueue", func() {
283284
Expect(metrics.depth["test"]).To(Equal(0))
284285
Expect(metrics.adds["test"]).To(Equal(2))
285286
})
287+
288+
It("doesn't include non-ready items in Len()", func() {
289+
q, metrics := newQueue()
290+
defer q.ShutDown()
291+
292+
q.AddWithOpts(AddOpts{After: time.Minute}, "foo")
293+
q.AddWithOpts(AddOpts{}, "baz")
294+
q.AddWithOpts(AddOpts{After: time.Minute}, "bar")
295+
q.AddWithOpts(AddOpts{}, "bal")
296+
297+
Expect(q.Len()).To(Equal(2))
298+
Expect(metrics.depth).To(HaveLen(1))
299+
Expect(metrics.depth["test"]).To(Equal(2))
300+
})
301+
302+
It("items are included in Len() and the queueDepth metric once they are ready", func() {
303+
q, metrics := newQueue()
304+
defer q.ShutDown()
305+
306+
q.AddWithOpts(AddOpts{After: 500 * time.Millisecond}, "foo")
307+
q.AddWithOpts(AddOpts{}, "baz")
308+
q.AddWithOpts(AddOpts{After: 500 * time.Millisecond}, "bar")
309+
q.AddWithOpts(AddOpts{}, "bal")
310+
311+
Expect(q.Len()).To(Equal(2))
312+
metrics.mu.Lock()
313+
Expect(metrics.depth["test"]).To(Equal(2))
314+
metrics.mu.Unlock()
315+
time.Sleep(time.Second)
316+
Expect(q.Len()).To(Equal(4))
317+
metrics.mu.Lock()
318+
Expect(metrics.depth["test"]).To(Equal(4))
319+
metrics.mu.Unlock()
320+
321+
// Drain queue
322+
for range 4 {
323+
item, _ := q.Get()
324+
q.Done(item)
325+
}
326+
Expect(q.Len()).To(Equal(0))
327+
metrics.mu.Lock()
328+
Expect(metrics.depth["test"]).To(Equal(0))
329+
metrics.mu.Unlock()
330+
331+
// Validate that doing it again still works to notice bugs with removing
332+
// it from the queues becameReady tracking.
333+
q.AddWithOpts(AddOpts{After: 500 * time.Millisecond}, "foo")
334+
q.AddWithOpts(AddOpts{}, "baz")
335+
q.AddWithOpts(AddOpts{After: 500 * time.Millisecond}, "bar")
336+
q.AddWithOpts(AddOpts{}, "bal")
337+
338+
Expect(q.Len()).To(Equal(2))
339+
metrics.mu.Lock()
340+
Expect(metrics.depth["test"]).To(Equal(2))
341+
metrics.mu.Unlock()
342+
time.Sleep(time.Second)
343+
Expect(q.Len()).To(Equal(4))
344+
metrics.mu.Lock()
345+
Expect(metrics.depth["test"]).To(Equal(4))
346+
metrics.mu.Unlock()
347+
})
348+
349+
It("returns many items", func() {
350+
// This test ensures the queue is able to drain a large queue without panic'ing.
351+
// In a previous version of the code we were calling queue.Delete within q.Ascend
352+
// which led to a panic in queue.Ascend > iterate:
353+
// "panic: runtime error: index out of range [0] with length 0"
354+
q, _ := newQueue()
355+
defer q.ShutDown()
356+
357+
for range 20 {
358+
for i := range 1000 {
359+
rn := rand.N(100) //nolint:gosec // We don't need cryptographically secure entropy here
360+
if rn < 10 {
361+
q.AddWithOpts(AddOpts{After: time.Duration(rn) * time.Millisecond}, fmt.Sprintf("foo%d", i))
362+
} else {
363+
q.AddWithOpts(AddOpts{Priority: rn}, fmt.Sprintf("foo%d", i))
364+
}
365+
}
366+
367+
wg := sync.WaitGroup{}
368+
for range 100 { // The panic only occurred relatively frequently with a high number of go routines.
369+
wg.Add(1)
370+
go func() {
371+
defer wg.Done()
372+
for range 10 {
373+
obj, _, _ := q.GetWithPriority()
374+
q.Done(obj)
375+
}
376+
}()
377+
}
378+
379+
wg.Wait()
380+
}
381+
})
286382
})
287383

288384
func BenchmarkAddGetDone(b *testing.B) {
@@ -438,10 +534,6 @@ func TestFuzzPrioriorityQueue(t *testing.T) {
438534
}
439535

440536
wg.Wait()
441-
442-
if expected := len(inQueue); expected != q.Len() {
443-
t.Errorf("Expected queue length to be %d, was %d", expected, q.Len())
444-
}
445537
}
446538

447539
func newQueue() (PriorityQueue[string], *fakeMetricsProvider) {
@@ -453,6 +545,8 @@ func newQueue() (PriorityQueue[string], *fakeMetricsProvider) {
453545
bTree: q.(*priorityqueue[string]).queue,
454546
}
455547

548+
// validate that tick always gets a positive value as it will just return
549+
// nil otherwise, which results in blocking forever.
456550
upstreamTick := q.(*priorityqueue[string]).tick
457551
q.(*priorityqueue[string]).tick = func(d time.Duration) <-chan time.Time {
458552
if d <= 0 {
@@ -477,7 +571,7 @@ func (b *btreeInteractionValidator) ReplaceOrInsert(item *item[string]) (*item[s
477571
}
478572

479573
func (b *btreeInteractionValidator) Delete(item *item[string]) (*item[string], bool) {
480-
// There is node codepath that deletes an item that doesn't exist
574+
// There is no codepath that deletes an item that doesn't exist
481575
old, existed := b.bTree.Delete(item)
482576
if !existed {
483577
panic(fmt.Sprintf("Delete: item %v not found", item))

pkg/handler/eventhandler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,8 @@ func (w workqueueWithCustomAddFunc[request]) Add(item request) {
194194
}
195195

196196
// isObjectUnchanged checks if the object in a create event is unchanged, for example because
197-
// we got it in our initial listwatch or because of a resync. The heuristic it uses is to check
198-
// if the object is older than one minute.
197+
// we got it in our initial listwatch. The heuristic it uses is to check if the object is older
198+
// than one minute.
199199
func isObjectUnchanged[object client.Object](e event.TypedCreateEvent[object]) bool {
200200
return e.Object.GetCreationTimestamp().Time.Before(time.Now().Add(-time.Minute))
201201
}

pkg/handler/eventhandler_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -776,7 +776,7 @@ var _ = Describe("Eventhandler", func() {
776776
})
777777

778778
Describe("WithLowPriorityWhenUnchanged", func() {
779-
It("should lower the priority of a create request for an object that was crated more than one minute in the past", func() {
779+
It("should lower the priority of a create request for an object that was created more than one minute in the past", func() {
780780
actualOpts := priorityqueue.AddOpts{}
781781
var actualRequests []reconcile.Request
782782
wq := &fakePriorityQueue{
@@ -797,7 +797,7 @@ var _ = Describe("Eventhandler", func() {
797797
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
798798
})
799799

800-
It("should not lower the priority of a create request for an object that was crated less than one minute in the past", func() {
800+
It("should not lower the priority of a create request for an object that was created less than one minute in the past", func() {
801801
actualOpts := priorityqueue.AddOpts{}
802802
var actualRequests []reconcile.Request
803803
wq := &fakePriorityQueue{

0 commit comments

Comments
 (0)