Skip to content

Commit 253f275

Browse files
authored
✨ Deduplicate events before sending them into the workqueue. (#1390)
* ✨ Deduplicate events before sending them into the workqueue. This avoids race conditions where extra reconciles can happen rarely. * ✨ Switch to map[string]struct{} to reduce memory usage slightly. Also make sure that enqueue_mapped preserves order. * 📝 Update function doc for getOwnerReconcileRequest. * 🎨 Fix up duplication tests and ensure Update for _mapped dedups between both objects.
1 parent b125a18 commit 253f275

File tree

4 files changed

+123
-91
lines changed

4 files changed

+123
-91
lines changed

pkg/handler/enqueue.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import (
2626

2727
var enqueueLog = logf.RuntimeLog.WithName("eventhandler").WithName("EnqueueRequestForObject")
2828

29+
type empty struct{}
30+
2931
var _ EventHandler = &EnqueueRequestForObject{}
3032

3133
// EnqueueRequestForObject enqueues a Request containing the Name and Namespace of the object that is the source of the Event.
@@ -47,22 +49,18 @@ func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.Rate
4749

4850
// Update implements EventHandler
4951
func (e *EnqueueRequestForObject) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
50-
if evt.ObjectOld != nil {
51-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
52-
Name: evt.ObjectOld.GetName(),
53-
Namespace: evt.ObjectOld.GetNamespace(),
54-
}})
55-
} else {
56-
enqueueLog.Error(nil, "UpdateEvent received with no old metadata", "event", evt)
57-
}
58-
5952
if evt.ObjectNew != nil {
6053
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
6154
Name: evt.ObjectNew.GetName(),
6255
Namespace: evt.ObjectNew.GetNamespace(),
6356
}})
57+
} else if evt.ObjectOld != nil {
58+
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
59+
Name: evt.ObjectOld.GetName(),
60+
Namespace: evt.ObjectOld.GetNamespace(),
61+
}})
6462
} else {
65-
enqueueLog.Error(nil, "UpdateEvent received with no new metadata", "event", evt)
63+
enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt)
6664
}
6765
}
6866

pkg/handler/enqueue_mapped.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,28 +53,36 @@ type enqueueRequestsFromMapFunc struct {
5353

5454
// Create implements EventHandler
5555
func (e *enqueueRequestsFromMapFunc) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
56-
e.mapAndEnqueue(q, evt.Object)
56+
reqs := map[reconcile.Request]empty{}
57+
e.mapAndEnqueue(q, evt.Object, reqs)
5758
}
5859

5960
// Update implements EventHandler
6061
func (e *enqueueRequestsFromMapFunc) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
61-
e.mapAndEnqueue(q, evt.ObjectOld)
62-
e.mapAndEnqueue(q, evt.ObjectNew)
62+
reqs := map[reconcile.Request]empty{}
63+
e.mapAndEnqueue(q, evt.ObjectOld, reqs)
64+
e.mapAndEnqueue(q, evt.ObjectNew, reqs)
6365
}
6466

6567
// Delete implements EventHandler
6668
func (e *enqueueRequestsFromMapFunc) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
67-
e.mapAndEnqueue(q, evt.Object)
69+
reqs := map[reconcile.Request]empty{}
70+
e.mapAndEnqueue(q, evt.Object, reqs)
6871
}
6972

7073
// Generic implements EventHandler
7174
func (e *enqueueRequestsFromMapFunc) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
72-
e.mapAndEnqueue(q, evt.Object)
75+
reqs := map[reconcile.Request]empty{}
76+
e.mapAndEnqueue(q, evt.Object, reqs)
7377
}
7478

75-
func (e *enqueueRequestsFromMapFunc) mapAndEnqueue(q workqueue.RateLimitingInterface, object client.Object) {
79+
func (e *enqueueRequestsFromMapFunc) mapAndEnqueue(q workqueue.RateLimitingInterface, object client.Object, reqs map[reconcile.Request]empty) {
7680
for _, req := range e.toRequests(object) {
77-
q.Add(req)
81+
_, ok := reqs[req]
82+
if !ok {
83+
q.Add(req)
84+
reqs[req] = empty{}
85+
}
7886
}
7987
}
8088

pkg/handler/enqueue_owner.go

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -59,31 +59,37 @@ type EnqueueRequestForOwner struct {
5959

6060
// Create implements EventHandler
6161
func (e *EnqueueRequestForOwner) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
62-
for _, req := range e.getOwnerReconcileRequest(evt.Object) {
62+
reqs := map[reconcile.Request]empty{}
63+
e.getOwnerReconcileRequest(evt.Object, reqs)
64+
for req := range reqs {
6365
q.Add(req)
6466
}
6567
}
6668

6769
// Update implements EventHandler
6870
func (e *EnqueueRequestForOwner) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
69-
for _, req := range e.getOwnerReconcileRequest(evt.ObjectOld) {
70-
q.Add(req)
71-
}
72-
for _, req := range e.getOwnerReconcileRequest(evt.ObjectNew) {
71+
reqs := map[reconcile.Request]empty{}
72+
e.getOwnerReconcileRequest(evt.ObjectOld, reqs)
73+
e.getOwnerReconcileRequest(evt.ObjectNew, reqs)
74+
for req := range reqs {
7375
q.Add(req)
7476
}
7577
}
7678

7779
// Delete implements EventHandler
7880
func (e *EnqueueRequestForOwner) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
79-
for _, req := range e.getOwnerReconcileRequest(evt.Object) {
81+
reqs := map[reconcile.Request]empty{}
82+
e.getOwnerReconcileRequest(evt.Object, reqs)
83+
for req := range reqs {
8084
q.Add(req)
8185
}
8286
}
8387

8488
// Generic implements EventHandler
8589
func (e *EnqueueRequestForOwner) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
86-
for _, req := range e.getOwnerReconcileRequest(evt.Object) {
90+
reqs := map[reconcile.Request]empty{}
91+
e.getOwnerReconcileRequest(evt.Object, reqs)
92+
for req := range reqs {
8793
q.Add(req)
8894
}
8995
}
@@ -109,19 +115,18 @@ func (e *EnqueueRequestForOwner) parseOwnerTypeGroupKind(scheme *runtime.Scheme)
109115
return nil
110116
}
111117

112-
// getOwnerReconcileRequest looks at object and returns a slice of reconcile.Request to reconcile
118+
// getOwnerReconcileRequest looks at object and builds a map of reconcile.Request to reconcile
113119
// owners of object that match e.OwnerType.
114-
func (e *EnqueueRequestForOwner) getOwnerReconcileRequest(object metav1.Object) []reconcile.Request {
120+
func (e *EnqueueRequestForOwner) getOwnerReconcileRequest(object metav1.Object, result map[reconcile.Request]empty) {
115121
// Iterate through the OwnerReferences looking for a match on Group and Kind against what was requested
116122
// by the user
117-
var result []reconcile.Request
118123
for _, ref := range e.getOwnersReferences(object) {
119124
// Parse the Group out of the OwnerReference to compare it to what was parsed out of the requested OwnerType
120125
refGV, err := schema.ParseGroupVersion(ref.APIVersion)
121126
if err != nil {
122127
log.Error(err, "Could not parse OwnerReference APIVersion",
123128
"api version", ref.APIVersion)
124-
return nil
129+
return
125130
}
126131

127132
// Compare the OwnerReference Group and Kind against the OwnerType Group and Kind specified by the user.
@@ -138,18 +143,15 @@ func (e *EnqueueRequestForOwner) getOwnerReconcileRequest(object metav1.Object)
138143
mapping, err := e.mapper.RESTMapping(e.groupKind, refGV.Version)
139144
if err != nil {
140145
log.Error(err, "Could not retrieve rest mapping", "kind", e.groupKind)
141-
return nil
146+
return
142147
}
143148
if mapping.Scope.Name() != meta.RESTScopeNameRoot {
144149
request.Namespace = object.GetNamespace()
145150
}
146151

147-
result = append(result, request)
152+
result[request] = empty{}
148153
}
149154
}
150-
151-
// Return the matches
152-
return result
153155
}
154156

155157
// getOwnersReferences returns the OwnerReferences for an object as specified by the EnqueueRequestForOwner

pkg/handler/eventhandler_test.go

Lines changed: 82 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ var _ = Describe("Eventhandler", func() {
8686
close(done)
8787
})
8888

89-
It("should enqueue a Request with the Name / Namespace of both objects in the UpdateEvent.",
89+
It("should enqueue a Request with the Name / Namespace of one object in the UpdateEvent.",
9090
func(done Done) {
9191
newPod := pod.DeepCopy()
9292
newPod.Name = "baz2"
@@ -97,18 +97,12 @@ var _ = Describe("Eventhandler", func() {
9797
ObjectNew: newPod,
9898
}
9999
instance.Update(evt, q)
100-
Expect(q.Len()).To(Equal(2))
100+
Expect(q.Len()).To(Equal(1))
101101

102102
i, _ := q.Get()
103103
Expect(i).NotTo(BeNil())
104104
req, ok := i.(reconcile.Request)
105105
Expect(ok).To(BeTrue())
106-
Expect(req.NamespacedName).To(Equal(types.NamespacedName{Namespace: "biz", Name: "baz"}))
107-
108-
i, _ = q.Get()
109-
Expect(i).NotTo(BeNil())
110-
req, ok = i.(reconcile.Request)
111-
Expect(ok).To(BeTrue())
112106
Expect(req.NamespacedName).To(Equal(types.NamespacedName{Namespace: "biz2", Name: "baz2"}))
113107

114108
close(done)
@@ -212,13 +206,14 @@ var _ = Describe("Eventhandler", func() {
212206
instance.Create(evt, q)
213207
Expect(q.Len()).To(Equal(2))
214208

215-
i, _ := q.Get()
216-
Expect(i).To(Equal(reconcile.Request{
217-
NamespacedName: types.NamespacedName{Namespace: "foo", Name: "bar"}}))
218-
219-
i, _ = q.Get()
220-
Expect(i).To(Equal(reconcile.Request{
221-
NamespacedName: types.NamespacedName{Namespace: "biz", Name: "baz"}}))
209+
i1, _ := q.Get()
210+
i2, _ := q.Get()
211+
Expect([]interface{}{i1, i2}).To(ConsistOf(
212+
reconcile.Request{
213+
NamespacedName: types.NamespacedName{Namespace: "foo", Name: "bar"}},
214+
reconcile.Request{
215+
NamespacedName: types.NamespacedName{Namespace: "biz", Name: "baz"}},
216+
))
222217
})
223218

224219
It("should enqueue a Request with the function applied to the DeleteEvent.", func() {
@@ -243,20 +238,19 @@ var _ = Describe("Eventhandler", func() {
243238
instance.Delete(evt, q)
244239
Expect(q.Len()).To(Equal(2))
245240

246-
i, _ := q.Get()
247-
Expect(i).To(Equal(reconcile.Request{
248-
NamespacedName: types.NamespacedName{Namespace: "foo", Name: "bar"}}))
249-
250-
i, _ = q.Get()
251-
Expect(i).To(Equal(reconcile.Request{
252-
NamespacedName: types.NamespacedName{Namespace: "biz", Name: "baz"}}))
241+
i1, _ := q.Get()
242+
i2, _ := q.Get()
243+
Expect([]interface{}{i1, i2}).To(ConsistOf(
244+
reconcile.Request{
245+
NamespacedName: types.NamespacedName{Namespace: "foo", Name: "bar"}},
246+
reconcile.Request{
247+
NamespacedName: types.NamespacedName{Namespace: "biz", Name: "baz"}},
248+
))
253249
})
254250

255251
It("should enqueue a Request with the function applied to both objects in the UpdateEvent.",
256252
func() {
257253
newPod := pod.DeepCopy()
258-
newPod.Name = pod.Name + "2"
259-
newPod.Namespace = pod.Namespace + "2"
260254

261255
req := []reconcile.Request{}
262256

@@ -278,23 +272,13 @@ var _ = Describe("Eventhandler", func() {
278272
ObjectNew: newPod,
279273
}
280274
instance.Update(evt, q)
281-
Expect(q.Len()).To(Equal(4))
275+
Expect(q.Len()).To(Equal(2))
282276

283277
i, _ := q.Get()
284-
Expect(i).To(Equal(reconcile.Request{
285-
NamespacedName: types.NamespacedName{Namespace: "foo", Name: "baz-bar"}}))
278+
Expect(i).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Namespace: "foo", Name: "baz-bar"}}))
286279

287280
i, _ = q.Get()
288-
Expect(i).To(Equal(reconcile.Request{
289-
NamespacedName: types.NamespacedName{Namespace: "biz", Name: "baz-baz"}}))
290-
291-
i, _ = q.Get()
292-
Expect(i).To(Equal(reconcile.Request{
293-
NamespacedName: types.NamespacedName{Namespace: "foo", Name: "baz2-bar"}}))
294-
295-
i, _ = q.Get()
296-
Expect(i).To(Equal(reconcile.Request{
297-
NamespacedName: types.NamespacedName{Namespace: "biz", Name: "baz2-baz"}}))
281+
Expect(i).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Namespace: "biz", Name: "baz-baz"}}))
298282
})
299283

300284
It("should enqueue a Request with the function applied to the GenericEvent.", func() {
@@ -319,13 +303,14 @@ var _ = Describe("Eventhandler", func() {
319303
instance.Generic(evt, q)
320304
Expect(q.Len()).To(Equal(2))
321305

322-
i, _ := q.Get()
323-
Expect(i).To(Equal(reconcile.Request{
324-
NamespacedName: types.NamespacedName{Namespace: "foo", Name: "bar"}}))
325-
326-
i, _ = q.Get()
327-
Expect(i).To(Equal(reconcile.Request{
328-
NamespacedName: types.NamespacedName{Namespace: "biz", Name: "baz"}}))
306+
i1, _ := q.Get()
307+
i2, _ := q.Get()
308+
Expect([]interface{}{i1, i2}).To(ConsistOf(
309+
reconcile.Request{
310+
NamespacedName: types.NamespacedName{Namespace: "foo", Name: "bar"}},
311+
reconcile.Request{
312+
NamespacedName: types.NamespacedName{Namespace: "biz", Name: "baz"}},
313+
))
329314
})
330315
})
331316

@@ -412,13 +397,50 @@ var _ = Describe("Eventhandler", func() {
412397
instance.Update(evt, q)
413398
Expect(q.Len()).To(Equal(2))
414399

415-
i, _ := q.Get()
416-
Expect(i).To(Equal(reconcile.Request{
417-
NamespacedName: types.NamespacedName{Namespace: pod.GetNamespace(), Name: "foo1-parent"}}))
400+
i1, _ := q.Get()
401+
i2, _ := q.Get()
402+
Expect([]interface{}{i1, i2}).To(ConsistOf(
403+
reconcile.Request{
404+
NamespacedName: types.NamespacedName{Namespace: pod.GetNamespace(), Name: "foo1-parent"}},
405+
reconcile.Request{
406+
NamespacedName: types.NamespacedName{Namespace: newPod.GetNamespace(), Name: "foo2-parent"}},
407+
))
408+
})
418409

419-
i, _ = q.Get()
410+
It("should enqueue a Request with the one duplicate Owner of both objects in the UpdateEvent.", func() {
411+
newPod := pod.DeepCopy()
412+
newPod.Name = pod.Name + "2"
413+
414+
instance := handler.EnqueueRequestForOwner{
415+
OwnerType: &appsv1.ReplicaSet{},
416+
}
417+
Expect(instance.InjectScheme(scheme.Scheme)).To(Succeed())
418+
Expect(instance.InjectMapper(mapper)).To(Succeed())
419+
420+
pod.OwnerReferences = []metav1.OwnerReference{
421+
{
422+
Name: "foo-parent",
423+
Kind: "ReplicaSet",
424+
APIVersion: "apps/v1",
425+
},
426+
}
427+
newPod.OwnerReferences = []metav1.OwnerReference{
428+
{
429+
Name: "foo-parent",
430+
Kind: "ReplicaSet",
431+
APIVersion: "apps/v1",
432+
},
433+
}
434+
evt := event.UpdateEvent{
435+
ObjectOld: pod,
436+
ObjectNew: newPod,
437+
}
438+
instance.Update(evt, q)
439+
Expect(q.Len()).To(Equal(1))
440+
441+
i, _ := q.Get()
420442
Expect(i).To(Equal(reconcile.Request{
421-
NamespacedName: types.NamespacedName{Namespace: newPod.GetNamespace(), Name: "foo2-parent"}}))
443+
NamespacedName: types.NamespacedName{Namespace: pod.GetNamespace(), Name: "foo-parent"}}))
422444
})
423445

424446
It("should enqueue a Request with the Owner of the object in the GenericEvent.", func() {
@@ -659,15 +681,17 @@ var _ = Describe("Eventhandler", func() {
659681
instance.Create(evt, q)
660682
Expect(q.Len()).To(Equal(3))
661683

662-
i, _ := q.Get()
663-
Expect(i).To(Equal(reconcile.Request{
664-
NamespacedName: types.NamespacedName{Namespace: pod.GetNamespace(), Name: "foo1-parent"}}))
665-
i, _ = q.Get()
666-
Expect(i).To(Equal(reconcile.Request{
667-
NamespacedName: types.NamespacedName{Namespace: pod.GetNamespace(), Name: "foo2-parent"}}))
668-
i, _ = q.Get()
669-
Expect(i).To(Equal(reconcile.Request{
670-
NamespacedName: types.NamespacedName{Namespace: pod.GetNamespace(), Name: "foo3-parent"}}))
684+
i1, _ := q.Get()
685+
i2, _ := q.Get()
686+
i3, _ := q.Get()
687+
Expect([]interface{}{i1, i2, i3}).To(ConsistOf(
688+
reconcile.Request{
689+
NamespacedName: types.NamespacedName{Namespace: pod.GetNamespace(), Name: "foo1-parent"}},
690+
reconcile.Request{
691+
NamespacedName: types.NamespacedName{Namespace: pod.GetNamespace(), Name: "foo2-parent"}},
692+
reconcile.Request{
693+
NamespacedName: types.NamespacedName{Namespace: pod.GetNamespace(), Name: "foo3-parent"}},
694+
))
671695
})
672696
})
673697

0 commit comments

Comments
 (0)