Skip to content

🐛Implement priorityqueue as default on handlers if using priorityqueue interface #3111

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions pkg/handler/enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,25 +52,32 @@ func (e *TypedEnqueueRequestForObject[T]) Create(ctx context.Context, evt event.
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
return
}
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{

item := reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Object.GetName(),
Namespace: evt.Object.GetNamespace(),
}})
}}

addToQueueCreate(q, evt, item)
}

// Update implements EventHandler.
func (e *TypedEnqueueRequestForObject[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
switch {
case !isNil(evt.ObjectNew):
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
item := reconcile.Request{NamespacedName: types.NamespacedName{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also do the same for Funcs? That is the only remaining handler

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is how you wanted this done but any feedback is appreciated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in Funcs we have to do the same dance as in TypedEnqueueRequestForObject because we export the type and not a constructor, otherwise the original goal of get this by default won't be achieved

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I did what we did with TypedEnqueueRequestForObject because I can't add methods to the Funcs type. So I did the same thing we did with this.

Name: evt.ObjectNew.GetName(),
Namespace: evt.ObjectNew.GetNamespace(),
}})
}}

addToQueueUpdate(q, evt, item)
case !isNil(evt.ObjectOld):
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
item := reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.ObjectOld.GetName(),
Namespace: evt.ObjectOld.GetNamespace(),
}})
}}

addToQueueUpdate(q, evt, item)
default:
enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/handler/enqueue_owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type OwnerOption func(e enqueueRequestForOwnerInterface)
//
// - a handler.enqueueRequestForOwner EventHandler with an OwnerType of ReplicaSet and OnlyControllerOwner set to true.
func EnqueueRequestForOwner(scheme *runtime.Scheme, mapper meta.RESTMapper, ownerType client.Object, opts ...OwnerOption) EventHandler {
return TypedEnqueueRequestForOwner[client.Object](scheme, mapper, ownerType, opts...)
return WithLowPriorityWhenUnchanged(TypedEnqueueRequestForOwner[client.Object](scheme, mapper, ownerType, opts...))
}

// TypedEnqueueRequestForOwner enqueues Requests for the Owners of an object. E.g. the object that created
Expand Down
56 changes: 35 additions & 21 deletions pkg/handler/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type EventHandler = TypedEventHandler[client.Object, reconcile.Request]
//
// Unless you are implementing your own TypedEventHandler, you can ignore the functions on the TypedEventHandler interface.
// Most users shouldn't need to implement their own TypedEventHandler.
//
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change breaks godoc rendering of TypedEventHandler. Now only l.69 shows up


// TypedEventHandler is experimental and subject to future change.
type TypedEventHandler[object any, request comparable] interface {
// Create is called in response to a create event - e.g. Pod Creation.
Expand Down Expand Up @@ -149,33 +149,15 @@ func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u Ty
u.Create(ctx, tce, workqueueWithCustomAddFunc[request]{
TypedRateLimitingInterface: trli,
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
if !isPriorityQueue {
q.Add(item)
return
}
var priority int
if isObjectUnchanged(tce) {
priority = LowPriority
}
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
addToQueueCreate(q, tce, item)
},
})
},
UpdateFunc: func(ctx context.Context, tue event.TypedUpdateEvent[object], trli workqueue.TypedRateLimitingInterface[request]) {
u.Update(ctx, tue, workqueueWithCustomAddFunc[request]{
TypedRateLimitingInterface: trli,
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
if !isPriorityQueue {
q.Add(item)
return
}
var priority int
if tue.ObjectOld.GetResourceVersion() == tue.ObjectNew.GetResourceVersion() {
priority = LowPriority
}
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
addToQueueUpdate(q, tue, item)
},
})
},
Expand All @@ -199,3 +181,35 @@ func (w workqueueWithCustomAddFunc[request]) Add(item request) {
func isObjectUnchanged[object client.Object](e event.TypedCreateEvent[object]) bool {
return e.Object.GetCreationTimestamp().Time.Before(time.Now().Add(-time.Minute))
}

// addToQueueCreate adds the reconcile.Request to the priorityqueue in the handler
// for Create requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request]
func addToQueueCreate[T client.Object, request comparable](q workqueue.TypedRateLimitingInterface[request], evt event.TypedCreateEvent[T], item request) {
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
if !isPriorityQueue {
q.Add(item)
return
}

var priority int
if isObjectUnchanged(evt) {
priority = LowPriority
}
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
}

// addToQueueUpdate adds the reconcile.Request to the priorityqueue in the handler
// for Update requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request]
func addToQueueUpdate[T client.Object, request comparable](q workqueue.TypedRateLimitingInterface[request], evt event.TypedUpdateEvent[T], item request) {
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
if !isPriorityQueue {
q.Add(item)
return
}

var priority int
if evt.ObjectOld.GetResourceVersion() == evt.ObjectNew.GetResourceVersion() {
priority = LowPriority
}
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
}
124 changes: 122 additions & 2 deletions pkg/handler/eventhandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ var _ = Describe("Eventhandler", func() {
})

Describe("Funcs", func() {
failingFuncs := handler.Funcs{
failingFuncs := handler.TypedFuncs[client.Object, reconcile.Request]{
CreateFunc: func(context.Context, event.CreateEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) {
defer GinkgoRecover()
Fail("Did not expect CreateEvent to be called.")
Expand Down Expand Up @@ -797,6 +797,27 @@ var _ = Describe("Eventhandler", func() {
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
})

It("should lower the priority of a create request for an object that was created more than one minute in the past without the WithLowPriorityWrapper", func() {
actualOpts := priorityqueue.AddOpts{}
var actualRequests []reconcile.Request
wq := &fakePriorityQueue{
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
actualOpts = o
actualRequests = items
},
}

h := &handler.EnqueueRequestForObject{}
h.Create(ctx, event.CreateEvent{
Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
}},
}, wq)

Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority}))
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
})

It("should not lower the priority of a create request for an object that was created less than one minute in the past", func() {
actualOpts := priorityqueue.AddOpts{}
var actualRequests []reconcile.Request
Expand All @@ -819,6 +840,28 @@ var _ = Describe("Eventhandler", func() {
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
})

It("should not lower the priority of a create request for an object that was created less than one minute in the past without the WithLowPriority wrapperr", func() {
actualOpts := priorityqueue.AddOpts{}
var actualRequests []reconcile.Request
wq := &fakePriorityQueue{
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
actualOpts = o
actualRequests = items
},
}

h := &handler.EnqueueRequestForObject{}
h.Create(ctx, event.CreateEvent{
Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
CreationTimestamp: metav1.Now(),
}},
}, wq)

Expect(actualOpts).To(Equal(priorityqueue.AddOpts{}))
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
})

It("should lower the priority of an update request with unchanged RV", func() {
actualOpts := priorityqueue.AddOpts{}
var actualRequests []reconcile.Request
Expand All @@ -843,6 +886,30 @@ var _ = Describe("Eventhandler", func() {
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
})

It("should lower the priority of an update request with unchanged RV without the WithLowPriority wrapper", func() {
actualOpts := priorityqueue.AddOpts{}
var actualRequests []reconcile.Request
wq := &fakePriorityQueue{
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
actualOpts = o
actualRequests = items
},
}

h := &handler.EnqueueRequestForObject{}
h.Update(ctx, event.UpdateEvent{
ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
}},
ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
}},
}, wq)

Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority}))
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
})

It("should not lower the priority of an update request with changed RV", func() {
actualOpts := priorityqueue.AddOpts{}
var actualRequests []reconcile.Request
Expand All @@ -868,6 +935,31 @@ var _ = Describe("Eventhandler", func() {
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
})

It("should not lower the priority of an update request with changed RV without the WithLowPriority wrapper", func() {
actualOpts := priorityqueue.AddOpts{}
var actualRequests []reconcile.Request
wq := &fakePriorityQueue{
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
actualOpts = o
actualRequests = items
},
}

h := &handler.EnqueueRequestForObject{}
h.Update(ctx, event.UpdateEvent{
ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
}},
ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
ResourceVersion: "1",
}},
}, wq)

Expect(actualOpts).To(Equal(priorityqueue.AddOpts{}))
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
})

It("should have no effect on create if the workqueue is not a priorityqueue", func() {
h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{})
h.Create(ctx, event.CreateEvent{
Expand All @@ -881,6 +973,19 @@ var _ = Describe("Eventhandler", func() {
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
})

It("should have no effect on create if the workqueue is not a priorityqueue without the WithLowPriority wrapper", func() {
h := &handler.EnqueueRequestForObject{}
h.Create(ctx, event.CreateEvent{
Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
}},
}, q)

Expect(q.Len()).To(Equal(1))
item, _ := q.Get()
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
})

It("should have no effect on Update if the workqueue is not a priorityqueue", func() {
h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{})
h.Update(ctx, event.UpdateEvent{
Expand All @@ -896,8 +1001,23 @@ var _ = Describe("Eventhandler", func() {
item, _ := q.Get()
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
})
})

It("should have no effect on Update if the workqueue is not a priorityqueue without the WithLowPriority wrapper", func() {
h := &handler.EnqueueRequestForObject{}
h.Update(ctx, event.UpdateEvent{
ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
}},
ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
}},
}, q)

Expect(q.Len()).To(Equal(1))
item, _ := q.Get()
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
})
})
})

type fakePriorityQueue struct {
Expand Down
Loading