Skip to content

Commit 22042c1

Browse files
committed
Support delaying requeue for objectkey.
1 parent 3db8d34 commit 22042c1

File tree

4 files changed

+80
-3
lines changed

4 files changed

+80
-3
lines changed

pkg/admission/cert/writer/internal/atomic/atomic_writer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ import (
2828
"strings"
2929
"testing"
3030

31+
log "github.com/go-logr/logr/testing"
3132
"k8s.io/apimachinery/pkg/util/sets"
3233
utiltesting "k8s.io/client-go/util/testing"
33-
log "github.com/go-logr/logr/testing"
3434
)
3535

3636
func TestNewAtomicWriter(t *testing.T) {

pkg/internal/controller/controller.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,8 +210,12 @@ func (c *Controller) processNextWorkItem() bool {
210210

211211
return false
212212
} else if result.Requeue {
213-
c.Queue.AddRateLimited(req)
214-
return true
213+
if result.RequeueAfter != nil {
214+
c.Queue.AddAfter(req, *result.RequeueAfter)
215+
} else {
216+
c.Queue.AddRateLimited(req)
217+
return true
218+
}
215219
}
216220

217221
// Finally, if no error occurs we Forget this item so it does not

pkg/internal/controller/controller_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,14 +332,59 @@ var _ = Describe("controller", func() {
332332
defer GinkgoRecover()
333333
Expect(ctrl.Start(stop)).NotTo(HaveOccurred())
334334
}()
335+
dq := &DelegatingQueue{RateLimitingInterface: ctrl.Queue}
336+
ctrl.Queue = dq
335337
ctrl.Queue.Add(request)
338+
Expect(dq.countAdd).To(Equal(1))
339+
Expect(dq.countAddAfter).To(Equal(0))
340+
Expect(dq.countAddRateLimited).To(Equal(0))
336341

337342
By("Invoking Reconciler which will ask for requeue")
338343
Expect(<-reconciled).To(Equal(request))
344+
Expect(dq.countAdd).To(Equal(1))
345+
Expect(dq.countAddAfter).To(Equal(0))
346+
Expect(dq.countAddRateLimited).To(Equal(1))
339347

340348
By("Invoking Reconciler a second time without asking for requeue")
341349
fakeReconcile.Result.Requeue = false
342350
Expect(<-reconciled).To(Equal(request))
351+
Expect(dq.countAdd).To(Equal(1))
352+
Expect(dq.countAddAfter).To(Equal(0))
353+
Expect(dq.countAddRateLimited).To(Equal(1))
354+
355+
By("Removing the item from the queue")
356+
Eventually(ctrl.Queue.Len).Should(Equal(0))
357+
Eventually(func() int { return ctrl.Queue.NumRequeues(request) }).Should(Equal(0))
358+
})
359+
360+
It("should requeue a Request after a duration if the Result sets Requeue:true and "+
361+
"RequeueAfter is set", func() {
362+
after := time.Millisecond * 100
363+
fakeReconcile.Result.Requeue = true
364+
fakeReconcile.Result.RequeueAfter = &after
365+
go func() {
366+
defer GinkgoRecover()
367+
Expect(ctrl.Start(stop)).NotTo(HaveOccurred())
368+
}()
369+
dq := &DelegatingQueue{RateLimitingInterface: ctrl.Queue}
370+
ctrl.Queue = dq
371+
ctrl.Queue.Add(request)
372+
Expect(dq.countAdd).To(Equal(1))
373+
Expect(dq.countAddAfter).To(Equal(0))
374+
Expect(dq.countAddRateLimited).To(Equal(0))
375+
376+
By("Invoking Reconciler which will ask for requeue")
377+
Expect(<-reconciled).To(Equal(request))
378+
Expect(dq.countAdd).To(Equal(1))
379+
Expect(dq.countAddAfter).To(Equal(1))
380+
Expect(dq.countAddRateLimited).To(Equal(0))
381+
382+
By("Invoking Reconciler a second time without asking for requeue")
383+
fakeReconcile.Result.Requeue = false
384+
Expect(<-reconciled).To(Equal(request))
385+
Expect(dq.countAdd).To(Equal(1))
386+
Expect(dq.countAddAfter).To(Equal(1))
387+
Expect(dq.countAddRateLimited).To(Equal(0))
343388

344389
By("Removing the item from the queue")
345390
Eventually(ctrl.Queue.Len).Should(Equal(0))
@@ -363,3 +408,26 @@ var _ = Describe("controller", func() {
363408
})
364409
})
365410
})
411+
412+
type DelegatingQueue struct {
413+
workqueue.RateLimitingInterface
414+
415+
countAddRateLimited int
416+
countAdd int
417+
countAddAfter int
418+
}
419+
420+
func (q *DelegatingQueue) AddRateLimited(item interface{}) {
421+
q.countAddRateLimited += 1
422+
q.RateLimitingInterface.AddRateLimited(item)
423+
}
424+
425+
func (q *DelegatingQueue) AddAfter(item interface{}, d time.Duration) {
426+
q.countAddAfter += 1
427+
q.RateLimitingInterface.AddAfter(item, d)
428+
}
429+
430+
func (q *DelegatingQueue) Add(item interface{}) {
431+
q.countAdd += 1
432+
q.RateLimitingInterface.Add(item)
433+
}

pkg/reconcile/reconcile.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,18 @@ limitations under the License.
1717
package reconcile
1818

1919
import (
20+
"time"
21+
2022
"k8s.io/apimachinery/pkg/types"
2123
)
2224

2325
// Result contains the result of a Reconciler invocation.
2426
type Result struct {
2527
// Requeue tells the Controller to requeue the reconcile key. Defaults to false.
2628
Requeue bool
29+
30+
// RequeueAfter is the duration to wait before adding the key back to the queue.
31+
RequeueAfter *time.Duration
2732
}
2833

2934
// Request contains the information necessary to reconcile a Kubernetes object. This includes the

0 commit comments

Comments
 (0)