Skip to content

[release-0.20] 🌱 Handlers: Default to LowPriorityWhenUnchanged without a wrapper #3179

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/builder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (blder *TypedBuilder[request]) Watches(
) *TypedBuilder[request] {
input := WatchesInput[request]{
obj: object,
handler: handler.WithLowPriorityWhenUnchanged(eventHandler),
handler: eventHandler,
}
for _, opt := range opts {
opt.ApplyToWatches(&input)
Expand Down Expand Up @@ -317,7 +317,7 @@ func (blder *TypedBuilder[request]) doWatch() error {
}

var hdler handler.TypedEventHandler[client.Object, request]
reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{})))
reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(&handler.EnqueueRequestForObject{}))
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, blder.forInput.predicates...)
src := source.TypedKind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
Expand All @@ -341,11 +341,11 @@ func (blder *TypedBuilder[request]) doWatch() error {
}

var hdler handler.TypedEventHandler[client.Object, request]
reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.WithLowPriorityWhenUnchanged(handler.EnqueueRequestForOwner(
reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.EnqueueRequestForOwner(
blder.mgr.GetScheme(), blder.mgr.GetRESTMapper(),
blder.forInput.object,
opts...,
))))
)))
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, own.predicates...)
src := source.TypedKind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
Expand Down
19 changes: 13 additions & 6 deletions pkg/handler/enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,25 +52,32 @@ func (e *TypedEnqueueRequestForObject[T]) Create(ctx context.Context, evt event.
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
return
}
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{

item := reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Object.GetName(),
Namespace: evt.Object.GetNamespace(),
}})
}}

addToQueueCreate(q, evt, item)
}

// Update implements EventHandler.
func (e *TypedEnqueueRequestForObject[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
switch {
case !isNil(evt.ObjectNew):
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
item := reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.ObjectNew.GetName(),
Namespace: evt.ObjectNew.GetNamespace(),
}})
}}

addToQueueUpdate(q, evt, item)
case !isNil(evt.ObjectOld):
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
item := reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.ObjectOld.GetName(),
Namespace: evt.ObjectOld.GetNamespace(),
}})
}}

addToQueueUpdate(q, evt, item)
default:
enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt)
}
Expand Down
45 changes: 36 additions & 9 deletions pkg/handler/enqueue_mapped.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
Expand Down Expand Up @@ -63,15 +64,17 @@ func EnqueueRequestsFromMapFunc(fn MapFunc) EventHandler {
// TypedEnqueueRequestsFromMapFunc is experimental and subject to future change.
func TypedEnqueueRequestsFromMapFunc[object any, request comparable](fn TypedMapFunc[object, request]) TypedEventHandler[object, request] {
return &enqueueRequestsFromMapFunc[object, request]{
toRequests: fn,
toRequests: fn,
objectImplementsClientObject: implementsClientObject[object](),
}
}

var _ EventHandler = &enqueueRequestsFromMapFunc[client.Object, reconcile.Request]{}

type enqueueRequestsFromMapFunc[object any, request comparable] struct {
// Mapper transforms the argument into a slice of keys to be reconciled
toRequests TypedMapFunc[object, request]
toRequests TypedMapFunc[object, request]
objectImplementsClientObject bool
}

// Create implements EventHandler.
Expand All @@ -81,7 +84,15 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Create(
q workqueue.TypedRateLimitingInterface[request],
) {
reqs := map[request]empty{}
e.mapAndEnqueue(ctx, q, evt.Object, reqs)

var lowPriority bool
if e.objectImplementsClientObject && isPriorityQueue(q) && !isNil(evt.Object) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to also cherry-pick: #3162

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take that back. That would introduce too many changes

clientObjectEvent := event.CreateEvent{Object: any(evt.Object).(client.Object)}
if isObjectUnchanged(clientObjectEvent) {
lowPriority = true
}
}
e.mapAndEnqueue(ctx, q, evt.Object, reqs, lowPriority)
}

// Update implements EventHandler.
Expand All @@ -90,9 +101,13 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Update(
evt event.TypedUpdateEvent[object],
q workqueue.TypedRateLimitingInterface[request],
) {
var lowPriority bool
if e.objectImplementsClientObject && isPriorityQueue(q) && !isNil(evt.ObjectOld) && !isNil(evt.ObjectNew) {
lowPriority = any(evt.ObjectOld).(client.Object).GetResourceVersion() == any(evt.ObjectNew).(client.Object).GetResourceVersion()
}
reqs := map[request]empty{}
e.mapAndEnqueue(ctx, q, evt.ObjectOld, reqs)
e.mapAndEnqueue(ctx, q, evt.ObjectNew, reqs)
e.mapAndEnqueue(ctx, q, evt.ObjectOld, reqs, lowPriority)
e.mapAndEnqueue(ctx, q, evt.ObjectNew, reqs, lowPriority)
}

// Delete implements EventHandler.
Expand All @@ -102,7 +117,7 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Delete(
q workqueue.TypedRateLimitingInterface[request],
) {
reqs := map[request]empty{}
e.mapAndEnqueue(ctx, q, evt.Object, reqs)
e.mapAndEnqueue(ctx, q, evt.Object, reqs, false)
}

// Generic implements EventHandler.
Expand All @@ -112,14 +127,26 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Generic(
q workqueue.TypedRateLimitingInterface[request],
) {
reqs := map[request]empty{}
e.mapAndEnqueue(ctx, q, evt.Object, reqs)
e.mapAndEnqueue(ctx, q, evt.Object, reqs, false)
}

func (e *enqueueRequestsFromMapFunc[object, request]) mapAndEnqueue(ctx context.Context, q workqueue.TypedRateLimitingInterface[request], o object, reqs map[request]empty) {
func (e *enqueueRequestsFromMapFunc[object, request]) mapAndEnqueue(
ctx context.Context,
q workqueue.TypedRateLimitingInterface[request],
o object,
reqs map[request]empty,
lowPriority bool,
) {
for _, req := range e.toRequests(ctx, o) {
_, ok := reqs[req]
if !ok {
q.Add(req)
if lowPriority {
q.(priorityqueue.PriorityQueue[request]).AddWithOpts(priorityqueue.AddOpts{
Priority: LowPriority,
}, req)
} else {
q.Add(req)
}
reqs[req] = empty{}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/handler/enqueue_owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TypedEnqueueRequestForOwner[object client.Object](scheme *runtime.Scheme, m
for _, opt := range opts {
opt(e)
}
return e
return WithLowPriorityWhenUnchanged(e)
}

// OnlyControllerOwner if provided will only look at the first OwnerReference with Controller: true.
Expand Down
132 changes: 94 additions & 38 deletions pkg/handler/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package handler

import (
"context"
"reflect"
"time"

"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -108,10 +109,46 @@ type TypedFuncs[object any, request comparable] struct {
GenericFunc func(context.Context, event.TypedGenericEvent[object], workqueue.TypedRateLimitingInterface[request])
}

var typeForClientObject = reflect.TypeFor[client.Object]()

func implementsClientObject[object any]() bool {
return reflect.TypeFor[object]().Implements(typeForClientObject)
}

func isPriorityQueue[request comparable](q workqueue.TypedRateLimitingInterface[request]) bool {
_, ok := q.(priorityqueue.PriorityQueue[request])
return ok
}

// Create implements EventHandler.
func (h TypedFuncs[object, request]) Create(ctx context.Context, e event.TypedCreateEvent[object], q workqueue.TypedRateLimitingInterface[request]) {
if h.CreateFunc != nil {
h.CreateFunc(ctx, e, q)
if !implementsClientObject[object]() || !isPriorityQueue(q) || isNil(e.Object) {
h.CreateFunc(ctx, e, q)
return
}
wq := workqueueWithCustomAddFunc[request]{
TypedRateLimitingInterface: q,
// We already know that we have a priority queue, that event.Object implements
// client.Object and that its not nil
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
// We construct a new event typed to client.Object because isObjectUnchanged
// is a generic and hence has to know at compile time the type of the event
// it gets. We only figure that out at runtime though, but we know for sure
// that it implements client.Object at this point so we can hardcode the event
// type to that.
evt := event.CreateEvent{Object: any(e.Object).(client.Object)}
var priority int
if isObjectUnchanged(evt) {
priority = LowPriority
}
q.(priorityqueue.PriorityQueue[request]).AddWithOpts(
priorityqueue.AddOpts{Priority: priority},
item,
)
},
}
h.CreateFunc(ctx, e, wq)
}
}

Expand All @@ -125,7 +162,27 @@ func (h TypedFuncs[object, request]) Delete(ctx context.Context, e event.TypedDe
// Update implements EventHandler.
func (h TypedFuncs[object, request]) Update(ctx context.Context, e event.TypedUpdateEvent[object], q workqueue.TypedRateLimitingInterface[request]) {
if h.UpdateFunc != nil {
h.UpdateFunc(ctx, e, q)
if !implementsClientObject[object]() || !isPriorityQueue(q) || isNil(e.ObjectOld) || isNil(e.ObjectNew) {
h.UpdateFunc(ctx, e, q)
return
}

wq := workqueueWithCustomAddFunc[request]{
TypedRateLimitingInterface: q,
// We already know that we have a priority queue, that event.ObjectOld and ObjectNew implement
// client.Object and that they are not nil
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
var priority int
if any(e.ObjectOld).(client.Object).GetResourceVersion() == any(e.ObjectNew).(client.Object).GetResourceVersion() {
priority = LowPriority
}
q.(priorityqueue.PriorityQueue[request]).AddWithOpts(
priorityqueue.AddOpts{Priority: priority},
item,
)
},
}
h.UpdateFunc(ctx, e, wq)
}
}

Expand All @@ -142,43 +199,10 @@ const LowPriority = -100
// WithLowPriorityWhenUnchanged reduces the priority of events stemming from the initial listwatch or from a resync if
// and only if a priorityqueue.PriorityQueue is used. If not, it does nothing.
func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u TypedEventHandler[object, request]) TypedEventHandler[object, request] {
// TypedFuncs already implements this so just wrap
return TypedFuncs[object, request]{
CreateFunc: func(ctx context.Context, tce event.TypedCreateEvent[object], trli workqueue.TypedRateLimitingInterface[request]) {
// Due to how the handlers are factored, we have to wrap the workqueue to be able
// to inject custom behavior.
u.Create(ctx, tce, workqueueWithCustomAddFunc[request]{
TypedRateLimitingInterface: trli,
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
if !isPriorityQueue {
q.Add(item)
return
}
var priority int
if isObjectUnchanged(tce) {
priority = LowPriority
}
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
},
})
},
UpdateFunc: func(ctx context.Context, tue event.TypedUpdateEvent[object], trli workqueue.TypedRateLimitingInterface[request]) {
u.Update(ctx, tue, workqueueWithCustomAddFunc[request]{
TypedRateLimitingInterface: trli,
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
if !isPriorityQueue {
q.Add(item)
return
}
var priority int
if tue.ObjectOld.GetResourceVersion() == tue.ObjectNew.GetResourceVersion() {
priority = LowPriority
}
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
},
})
},
CreateFunc: u.Create,
UpdateFunc: u.Update,
DeleteFunc: u.Delete,
GenericFunc: u.Generic,
}
Expand All @@ -199,3 +223,35 @@ func (w workqueueWithCustomAddFunc[request]) Add(item request) {
func isObjectUnchanged[object client.Object](e event.TypedCreateEvent[object]) bool {
return e.Object.GetCreationTimestamp().Time.Before(time.Now().Add(-time.Minute))
}

// addToQueueCreate adds the reconcile.Request to the priorityqueue in the handler
// for Create requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request]
func addToQueueCreate[T client.Object, request comparable](q workqueue.TypedRateLimitingInterface[request], evt event.TypedCreateEvent[T], item request) {
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
if !isPriorityQueue {
q.Add(item)
return
}

var priority int
if isObjectUnchanged(evt) {
priority = LowPriority
}
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
}

// addToQueueUpdate adds the reconcile.Request to the priorityqueue in the handler
// for Update requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request]
func addToQueueUpdate[T client.Object, request comparable](q workqueue.TypedRateLimitingInterface[request], evt event.TypedUpdateEvent[T], item request) {
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
if !isPriorityQueue {
q.Add(item)
return
}

var priority int
if evt.ObjectOld.GetResourceVersion() == evt.ObjectNew.GetResourceVersion() {
priority = LowPriority
}
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
}
Loading