Skip to content

Commit 7122ca0

Browse files
committed
Metric: Adds are only counted if the object didn't exist yet
1 parent 10671f6 commit 7122ca0

File tree

4 files changed

+38
-65
lines changed

4 files changed

+38
-65
lines changed

.golangci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ issues:
172172
- unused
173173
# Seems to incorrectly trigger on the two implementations that are only
174174
# used through an interface and not directly..?
175-
path: pkg/controllerworkqueue/metrics\.go
175+
path: pkg/controller/priorityqueue/metrics\.go
176176

177177
run:
178178
go: "1.23"

pkg/controller/priorityqueue/metrics.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ import (
1111

1212
// This file is mostly a copy of unexported code from
1313
// https://github.com/kubernetes/kubernetes/blob/1d8828ce707ed9dd7a6a9756385419cce1d202ac/staging/src/k8s.io/client-go/util/workqueue/metrics.go
14+
//
15+
// The only difference is the addition of mapLock in defaultQueueMetrics, we want to avoid the need of synchronizing updateUnfinishedWork()
16+
// with the queue.
1417

1518
type queueMetrics[T comparable] interface {
1619
add(item T)

pkg/controller/priorityqueue/priorityqueue.go

Lines changed: 27 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,10 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
5656
opts.MetricProvider = metrics.WorkqueueMetricsProvider{}
5757
}
5858

59-
cwq := &priorityqueue[T]{
60-
items: map[T]*item[T]{},
61-
queue: btree.NewG(32, less[T]),
59+
pq := &priorityqueue[T]{
60+
items: map[T]*item[T]{},
61+
queue: btree.NewG(32, less[T]),
62+
metrics: newQueueMetrics[T](opts.MetricProvider, name, clock.RealClock{}),
6263
// itemOrWaiterAdded indicates that an item or
6364
// waiter was added. It must be buffered, because
6465
// if we currently process items we can't tell
@@ -72,25 +73,30 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
7273
tick: time.Tick,
7374
}
7475

75-
go cwq.spin()
76+
go pq.spin()
77+
if _, ok := pq.metrics.(noMetrics[T]); !ok {
78+
go pq.updateUnfinishedWorkLoop()
79+
}
7680

77-
return wrapWithMetrics(cwq, name, opts.MetricProvider)
81+
return pq
7882
}
7983

8084
type priorityqueue[T comparable] struct {
81-
// lock has to be acquired for any access to either items or queue
82-
lock sync.Mutex
83-
items map[T]*item[T]
84-
queue *btree.BTreeG[*item[T]]
85-
86-
itemOrWaiterAdded chan struct{}
87-
88-
rateLimiter workqueue.TypedRateLimiter[T]
85+
// lock has to be acquired for any access any of items, queue, addedCounter
86+
// or metrics.
87+
lock sync.Mutex
88+
items map[T]*item[T]
89+
queue *btree.BTreeG[*item[T]]
90+
metrics queueMetrics[T]
8991

9092
// addedCounter is a counter of elements added, we need it
9193
// because unixNano is not guaranteed to be unique.
9294
addedCounter uint64
9395

96+
itemOrWaiterAdded chan struct{}
97+
98+
rateLimiter workqueue.TypedRateLimiter[T]
99+
94100
// locked contains the keys we handed out through Get() and that haven't
95101
// yet been returned through Done().
96102
locked sets.Set[T]
@@ -136,6 +142,7 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
136142
}
137143
w.items[key] = item
138144
w.queue.ReplaceOrInsert(item)
145+
w.metrics.add(key)
139146
w.addedCounter++
140147
continue
141148
}
@@ -204,11 +211,12 @@ func (w *priorityqueue[T]) spin() {
204211
return true
205212
}
206213

207-
w.get <- *item
214+
w.metrics.get(item.key)
208215
w.locked.Insert(item.key)
209216
w.waiters.Add(-1)
210217
delete(w.items, item.key)
211218
w.queue.Delete(item)
219+
w.get <- *item
212220

213221
return true
214222
})
@@ -258,6 +266,7 @@ func (w *priorityqueue[T]) Done(item T) {
258266
w.lockedLock.Lock()
259267
defer w.lockedLock.Unlock()
260268
w.locked.Delete(item)
269+
w.metrics.done(item)
261270
w.notifyItemOrWaiterAdded()
262271
}
263272

@@ -306,52 +315,13 @@ type item[T comparable] struct {
306315
readyAt *time.Time
307316
}
308317

309-
func wrapWithMetrics[T comparable](q *priorityqueue[T], name string, provider workqueue.MetricsProvider) PriorityQueue[T] {
310-
mwq := &metricWrappedQueue[T]{
311-
priorityqueue: q,
312-
metrics: newQueueMetrics[T](provider, name, clock.RealClock{}),
313-
}
314-
315-
go mwq.updateUnfinishedWorkLoop()
316-
317-
return mwq
318-
}
319-
320-
type metricWrappedQueue[T comparable] struct {
321-
*priorityqueue[T]
322-
metrics queueMetrics[T]
323-
}
324-
325-
func (m *metricWrappedQueue[T]) AddWithOpts(o AddOpts, items ...T) {
326-
for _, item := range items {
327-
m.metrics.add(item)
328-
}
329-
m.priorityqueue.AddWithOpts(o, items...)
330-
}
331-
332-
func (m *metricWrappedQueue[T]) GetWithPriority() (T, int, bool) {
333-
item, priority, shutdown := m.priorityqueue.GetWithPriority()
334-
m.metrics.get(item)
335-
return item, priority, shutdown
336-
}
337-
338-
func (m *metricWrappedQueue[T]) Get() (T, bool) {
339-
item, _, shutdown := m.GetWithPriority()
340-
return item, shutdown
341-
}
342-
343-
func (m *metricWrappedQueue[T]) Done(item T) {
344-
m.metrics.done(item)
345-
m.priorityqueue.Done(item)
346-
}
347-
348-
func (m *metricWrappedQueue[T]) updateUnfinishedWorkLoop() {
349-
t := time.NewTicker(time.Millisecond)
318+
func (w *priorityqueue[T]) updateUnfinishedWorkLoop() {
319+
t := time.NewTicker(500 * time.Millisecond) // borrowed from workqueue: https://github.com/kubernetes/kubernetes/blob/67a807bf142c7a2a5ecfdb2a5d24b4cdea4cc79c/staging/src/k8s.io/client-go/util/workqueue/queue.go#L182
350320
defer t.Stop()
351321
for range t.C {
352-
if m.priorityqueue.ShuttingDown() {
322+
if w.shutdown.Load() {
353323
return
354324
}
355-
m.metrics.updateUnfinishedWork()
325+
w.metrics.updateUnfinishedWork()
356326
}
357327
}

pkg/controller/priorityqueue/priorityqueue_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -90,12 +90,12 @@ var _ = Describe("Controllerworkqueue", func() {
9090

9191
Consistently(q.Len).Should(Equal(1))
9292

93-
cwq := q.(*metricWrappedQueue[string])
93+
cwq := q.(*priorityqueue[string])
9494
cwq.lockedLock.Lock()
9595
Expect(cwq.locked.Len()).To(Equal(0))
9696

9797
Expect(metrics.depth["test"]).To(Equal(1))
98-
Expect(metrics.adds["test"]).To(Equal(2))
98+
Expect(metrics.adds["test"]).To(Equal(1))
9999
})
100100

101101
It("retains the highest priority", func() {
@@ -112,7 +112,7 @@ var _ = Describe("Controllerworkqueue", func() {
112112
Expect(q.Len()).To(Equal(0))
113113

114114
Expect(metrics.depth["test"]).To(Equal(0))
115-
Expect(metrics.adds["test"]).To(Equal(2))
115+
Expect(metrics.adds["test"]).To(Equal(1))
116116
})
117117

118118
It("gets pushed to the front if the priority increases", func() {
@@ -131,7 +131,7 @@ var _ = Describe("Controllerworkqueue", func() {
131131
Expect(q.Len()).To(Equal(2))
132132

133133
Expect(metrics.depth["test"]).To(Equal(2))
134-
Expect(metrics.adds["test"]).To(Equal(4))
134+
Expect(metrics.adds["test"]).To(Equal(3))
135135
})
136136

137137
It("retains the lowest after duration", func() {
@@ -147,7 +147,7 @@ var _ = Describe("Controllerworkqueue", func() {
147147

148148
Expect(q.Len()).To(Equal(0))
149149
Expect(metrics.depth["test"]).To(Equal(0))
150-
Expect(metrics.adds["test"]).To(Equal(2))
150+
Expect(metrics.adds["test"]).To(Equal(1))
151151
})
152152

153153
It("returns an item only after after has passed", func() {
@@ -158,7 +158,7 @@ var _ = Describe("Controllerworkqueue", func() {
158158
nowLock := sync.Mutex{}
159159
tick := make(chan time.Time)
160160

161-
cwq := q.(*metricWrappedQueue[string])
161+
cwq := q.(*priorityqueue[string])
162162
cwq.now = func() time.Time {
163163
nowLock.Lock()
164164
defer nowLock.Unlock()
@@ -199,7 +199,7 @@ var _ = Describe("Controllerworkqueue", func() {
199199
nowLock := sync.Mutex{}
200200
tick := make(chan time.Time)
201201

202-
cwq := q.(*metricWrappedQueue[string])
202+
cwq := q.(*priorityqueue[string])
203203
cwq.now = func() time.Time {
204204
nowLock.Lock()
205205
defer nowLock.Unlock()

0 commit comments

Comments
 (0)