Skip to content

Commit 10671f6

Browse files
committed
Move into package priorityqueue
1 parent 50d59ce commit 10671f6

File tree

7 files changed

+38
-38
lines changed

7 files changed

+38
-38
lines changed

pkg/controller/controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525
"k8s.io/client-go/util/workqueue"
2626
"k8s.io/klog/v2"
2727

28-
"sigs.k8s.io/controller-runtime/pkg/controllerworkqueue"
28+
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
2929
"sigs.k8s.io/controller-runtime/pkg/internal/controller"
3030
"sigs.k8s.io/controller-runtime/pkg/manager"
3131
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -196,7 +196,7 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt
196196
if options.NewQueue == nil {
197197
if mgr.GetControllerOptions().UsePriorityQueue {
198198
options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] {
199-
return controllerworkqueue.New[request](controllerName)
199+
return priorityqueue.New[request](controllerName)
200200
}
201201
} else {
202202
options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] {

pkg/controllerworkqueue/metrics.go renamed to pkg/controller/priorityqueue/metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package controllerworkqueue
1+
package priorityqueue
22

33
import (
44
"sync"

pkg/controllerworkqueue/metrics_test.go renamed to pkg/controller/priorityqueue/metrics_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package controllerworkqueue
1+
package priorityqueue
22

33
import (
44
"sync"

pkg/controllerworkqueue/workqueue.go renamed to pkg/controller/priorityqueue/priorityqueue.go

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package controllerworkqueue
1+
package priorityqueue
22

33
import (
44
"sync"
@@ -56,7 +56,7 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
5656
opts.MetricProvider = metrics.WorkqueueMetricsProvider{}
5757
}
5858

59-
cwq := &controllerworkqueue[T]{
59+
cwq := &priorityqueue[T]{
6060
items: map[T]*item[T]{},
6161
queue: btree.NewG(32, less[T]),
6262
// itemOrWaiterAdded indicates that an item or
@@ -77,7 +77,7 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
7777
return wrapWithMetrics(cwq, name, opts.MetricProvider)
7878
}
7979

80-
type controllerworkqueue[T comparable] struct {
80+
type priorityqueue[T comparable] struct {
8181
// lock has to be acquired for any access to either items or queue
8282
lock sync.Mutex
8383
items map[T]*item[T]
@@ -110,7 +110,7 @@ type controllerworkqueue[T comparable] struct {
110110
tick func(time.Duration) <-chan time.Time
111111
}
112112

113-
func (w *controllerworkqueue[T]) AddWithOpts(o AddOpts, items ...T) {
113+
func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
114114
w.lock.Lock()
115115
defer w.lock.Unlock()
116116

@@ -155,14 +155,14 @@ func (w *controllerworkqueue[T]) AddWithOpts(o AddOpts, items ...T) {
155155
}
156156
}
157157

158-
func (w *controllerworkqueue[T]) notifyItemOrWaiterAdded() {
158+
func (w *priorityqueue[T]) notifyItemOrWaiterAdded() {
159159
select {
160160
case w.itemOrWaiterAdded <- struct{}{}:
161161
default:
162162
}
163163
}
164164

165-
func (w *controllerworkqueue[T]) spin() {
165+
func (w *priorityqueue[T]) spin() {
166166
blockForever := make(chan time.Time)
167167
var nextReady <-chan time.Time
168168
nextReady = blockForever
@@ -216,19 +216,19 @@ func (w *controllerworkqueue[T]) spin() {
216216
}
217217
}
218218

219-
func (w *controllerworkqueue[T]) Add(item T) {
219+
func (w *priorityqueue[T]) Add(item T) {
220220
w.AddWithOpts(AddOpts{}, item)
221221
}
222222

223-
func (w *controllerworkqueue[T]) AddAfter(item T, after time.Duration) {
223+
func (w *priorityqueue[T]) AddAfter(item T, after time.Duration) {
224224
w.AddWithOpts(AddOpts{After: after}, item)
225225
}
226226

227-
func (w *controllerworkqueue[T]) AddRateLimited(item T) {
227+
func (w *priorityqueue[T]) AddRateLimited(item T) {
228228
w.AddWithOpts(AddOpts{RateLimited: true}, item)
229229
}
230230

231-
func (w *controllerworkqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool) {
231+
func (w *priorityqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool) {
232232
w.waiters.Add(1)
233233

234234
w.notifyItemOrWaiterAdded()
@@ -237,40 +237,40 @@ func (w *controllerworkqueue[T]) GetWithPriority() (_ T, priority int, shutdown
237237
return item.key, item.priority, w.shutdown.Load()
238238
}
239239

240-
func (w *controllerworkqueue[T]) Get() (item T, shutdown bool) {
240+
func (w *priorityqueue[T]) Get() (item T, shutdown bool) {
241241
key, _, shutdown := w.GetWithPriority()
242242
return key, shutdown
243243
}
244244

245-
func (w *controllerworkqueue[T]) Forget(item T) {
245+
func (w *priorityqueue[T]) Forget(item T) {
246246
w.rateLimiter.Forget(item)
247247
}
248248

249-
func (w *controllerworkqueue[T]) NumRequeues(item T) int {
249+
func (w *priorityqueue[T]) NumRequeues(item T) int {
250250
return w.rateLimiter.NumRequeues(item)
251251
}
252252

253-
func (w *controllerworkqueue[T]) ShuttingDown() bool {
253+
func (w *priorityqueue[T]) ShuttingDown() bool {
254254
return w.shutdown.Load()
255255
}
256256

257-
func (w *controllerworkqueue[T]) Done(item T) {
257+
func (w *priorityqueue[T]) Done(item T) {
258258
w.lockedLock.Lock()
259259
defer w.lockedLock.Unlock()
260260
w.locked.Delete(item)
261261
w.notifyItemOrWaiterAdded()
262262
}
263263

264-
func (w *controllerworkqueue[T]) ShutDown() {
264+
func (w *priorityqueue[T]) ShutDown() {
265265
w.shutdown.Store(true)
266266
close(w.done)
267267
}
268268

269-
func (w *controllerworkqueue[T]) ShutDownWithDrain() {
269+
func (w *priorityqueue[T]) ShutDownWithDrain() {
270270
w.ShutDown()
271271
}
272272

273-
func (w *controllerworkqueue[T]) Len() int {
273+
func (w *priorityqueue[T]) Len() int {
274274
w.lock.Lock()
275275
defer w.lock.Unlock()
276276

@@ -306,10 +306,10 @@ type item[T comparable] struct {
306306
readyAt *time.Time
307307
}
308308

309-
func wrapWithMetrics[T comparable](q *controllerworkqueue[T], name string, provider workqueue.MetricsProvider) PriorityQueue[T] {
309+
func wrapWithMetrics[T comparable](q *priorityqueue[T], name string, provider workqueue.MetricsProvider) PriorityQueue[T] {
310310
mwq := &metricWrappedQueue[T]{
311-
controllerworkqueue: q,
312-
metrics: newQueueMetrics[T](provider, name, clock.RealClock{}),
311+
priorityqueue: q,
312+
metrics: newQueueMetrics[T](provider, name, clock.RealClock{}),
313313
}
314314

315315
go mwq.updateUnfinishedWorkLoop()
@@ -318,19 +318,19 @@ func wrapWithMetrics[T comparable](q *controllerworkqueue[T], name string, provi
318318
}
319319

320320
type metricWrappedQueue[T comparable] struct {
321-
*controllerworkqueue[T]
321+
*priorityqueue[T]
322322
metrics queueMetrics[T]
323323
}
324324

325325
func (m *metricWrappedQueue[T]) AddWithOpts(o AddOpts, items ...T) {
326326
for _, item := range items {
327327
m.metrics.add(item)
328328
}
329-
m.controllerworkqueue.AddWithOpts(o, items...)
329+
m.priorityqueue.AddWithOpts(o, items...)
330330
}
331331

332332
func (m *metricWrappedQueue[T]) GetWithPriority() (T, int, bool) {
333-
item, priority, shutdown := m.controllerworkqueue.GetWithPriority()
333+
item, priority, shutdown := m.priorityqueue.GetWithPriority()
334334
m.metrics.get(item)
335335
return item, priority, shutdown
336336
}
@@ -342,14 +342,14 @@ func (m *metricWrappedQueue[T]) Get() (T, bool) {
342342

343343
func (m *metricWrappedQueue[T]) Done(item T) {
344344
m.metrics.done(item)
345-
m.controllerworkqueue.Done(item)
345+
m.priorityqueue.Done(item)
346346
}
347347

348348
func (m *metricWrappedQueue[T]) updateUnfinishedWorkLoop() {
349349
t := time.NewTicker(time.Millisecond)
350350
defer t.Stop()
351351
for range t.C {
352-
if m.controllerworkqueue.ShuttingDown() {
352+
if m.priorityqueue.ShuttingDown() {
353353
return
354354
}
355355
m.metrics.updateUnfinishedWork()

pkg/controllerworkqueue/workqueue_suite_test.go renamed to pkg/controller/priorityqueue/priorityqueue_suite_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package controllerworkqueue
1+
package priorityqueue
22

33
import (
44
"testing"

pkg/controllerworkqueue/workqueue_test.go renamed to pkg/controller/priorityqueue/priorityqueue_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package controllerworkqueue
1+
package priorityqueue
22

33
import (
44
"sync"

pkg/handler/eventhandler.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222

2323
"k8s.io/client-go/util/workqueue"
2424
"sigs.k8s.io/controller-runtime/pkg/client"
25-
"sigs.k8s.io/controller-runtime/pkg/controllerworkqueue"
25+
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
2626
"sigs.k8s.io/controller-runtime/pkg/event"
2727
"sigs.k8s.io/controller-runtime/pkg/reconcile"
2828
)
@@ -140,7 +140,7 @@ func (h TypedFuncs[object, request]) Generic(ctx context.Context, e event.TypedG
140140
const LowPriority = -100
141141

142142
// WithLowPriorityWhenUnchanged reduces the priority of events stemming from the initial listwatch or from a resync if
143-
// and only if a controllerworkqueue.PriorityQueue is used. If not, it does nothing.
143+
// and only if a priorityqueue.PriorityQueue is used. If not, it does nothing.
144144
func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u TypedEventHandler[object, request]) TypedEventHandler[object, request] {
145145
return TypedFuncs[object, request]{
146146
CreateFunc: func(ctx context.Context, tce event.TypedCreateEvent[object], trli workqueue.TypedRateLimitingInterface[request]) {
@@ -149,7 +149,7 @@ func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u Ty
149149
u.Create(ctx, tce, workqueueWithCustomAddFunc[request]{
150150
TypedRateLimitingInterface: trli,
151151
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
152-
priorityQueue, isPriorityQueue := q.(controllerworkqueue.PriorityQueue[request])
152+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
153153
if !isPriorityQueue {
154154
q.Add(item)
155155
return
@@ -158,15 +158,15 @@ func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u Ty
158158
if isObjectUnchanged(tce) {
159159
priority = LowPriority
160160
}
161-
priorityQueue.AddWithOpts(controllerworkqueue.AddOpts{Priority: priority}, item)
161+
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
162162
},
163163
})
164164
},
165165
UpdateFunc: func(ctx context.Context, tue event.TypedUpdateEvent[object], trli workqueue.TypedRateLimitingInterface[request]) {
166166
u.Update(ctx, tue, workqueueWithCustomAddFunc[request]{
167167
TypedRateLimitingInterface: trli,
168168
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
169-
priorityQueue, isPriorityQueue := q.(controllerworkqueue.PriorityQueue[request])
169+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
170170
if !isPriorityQueue {
171171
q.Add(item)
172172
return
@@ -175,7 +175,7 @@ func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u Ty
175175
if tue.ObjectOld.GetResourceVersion() == tue.ObjectNew.GetResourceVersion() {
176176
priority = LowPriority
177177
}
178-
priorityQueue.AddWithOpts(controllerworkqueue.AddOpts{Priority: priority}, item)
178+
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
179179
},
180180
})
181181
},

0 commit comments

Comments
 (0)