Skip to content

Commit c5268a8

Browse files
authored
Merge pull request kubernetes-sigs#131 from pwittrock/requeue-after
Support delaying requeue for objectkey.
2 parents 4b39913 + f8022fe commit c5268a8

File tree

4 files changed

+75
-1
lines changed

4 files changed

+75
-1
lines changed

pkg/admission/cert/writer/internal/atomic/atomic_writer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ import (
2828
"strings"
2929
"testing"
3030

31+
log "github.com/go-logr/logr/testing"
3132
"k8s.io/apimachinery/pkg/util/sets"
3233
utiltesting "k8s.io/client-go/util/testing"
33-
log "github.com/go-logr/logr/testing"
3434
)
3535

3636
func TestNewAtomicWriter(t *testing.T) {

pkg/internal/controller/controller.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,9 @@ func (c *Controller) processNextWorkItem() bool {
209209
log.Error(err, "Reconciler error", "Controller", c.Name, "Request", req)
210210

211211
return false
212+
} else if result.RequeueAfter > 0 {
213+
c.Queue.AddAfter(req, result.RequeueAfter)
214+
return true
212215
} else if result.Requeue {
213216
c.Queue.AddRateLimited(req)
214217
return true

pkg/internal/controller/controller_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,14 +332,57 @@ var _ = Describe("controller", func() {
332332
defer GinkgoRecover()
333333
Expect(ctrl.Start(stop)).NotTo(HaveOccurred())
334334
}()
335+
dq := &DelegatingQueue{RateLimitingInterface: ctrl.Queue}
336+
ctrl.Queue = dq
335337
ctrl.Queue.Add(request)
338+
Expect(dq.countAdd).To(Equal(1))
339+
Expect(dq.countAddAfter).To(Equal(0))
340+
Expect(dq.countAddRateLimited).To(Equal(0))
336341

337342
By("Invoking Reconciler which will ask for requeue")
338343
Expect(<-reconciled).To(Equal(request))
344+
Expect(dq.countAdd).To(Equal(1))
345+
Expect(dq.countAddAfter).To(Equal(0))
346+
Expect(dq.countAddRateLimited).To(Equal(1))
339347

340348
By("Invoking Reconciler a second time without asking for requeue")
341349
fakeReconcile.Result.Requeue = false
342350
Expect(<-reconciled).To(Equal(request))
351+
Expect(dq.countAdd).To(Equal(1))
352+
Expect(dq.countAddAfter).To(Equal(0))
353+
Expect(dq.countAddRateLimited).To(Equal(1))
354+
355+
By("Removing the item from the queue")
356+
Eventually(ctrl.Queue.Len).Should(Equal(0))
357+
Eventually(func() int { return ctrl.Queue.NumRequeues(request) }).Should(Equal(0))
358+
})
359+
360+
It("should requeue a Request after a duration if the Result sets Requeue:true and "+
361+
"RequeueAfter is set", func() {
362+
fakeReconcile.Result.RequeueAfter = time.Millisecond * 100
363+
go func() {
364+
defer GinkgoRecover()
365+
Expect(ctrl.Start(stop)).NotTo(HaveOccurred())
366+
}()
367+
dq := &DelegatingQueue{RateLimitingInterface: ctrl.Queue}
368+
ctrl.Queue = dq
369+
ctrl.Queue.Add(request)
370+
Expect(dq.countAdd).To(Equal(1))
371+
Expect(dq.countAddAfter).To(Equal(0))
372+
Expect(dq.countAddRateLimited).To(Equal(0))
373+
374+
By("Invoking Reconciler which will ask for requeue")
375+
Expect(<-reconciled).To(Equal(request))
376+
Expect(dq.countAdd).To(Equal(1))
377+
Expect(dq.countAddAfter).To(Equal(1))
378+
Expect(dq.countAddRateLimited).To(Equal(0))
379+
380+
By("Invoking Reconciler a second time without asking for requeue")
381+
fakeReconcile.Result.Requeue = false
382+
Expect(<-reconciled).To(Equal(request))
383+
Expect(dq.countAdd).To(Equal(1))
384+
Expect(dq.countAddAfter).To(Equal(1))
385+
Expect(dq.countAddRateLimited).To(Equal(0))
343386

344387
By("Removing the item from the queue")
345388
Eventually(ctrl.Queue.Len).Should(Equal(0))
@@ -363,3 +406,26 @@ var _ = Describe("controller", func() {
363406
})
364407
})
365408
})
409+
410+
type DelegatingQueue struct {
411+
workqueue.RateLimitingInterface
412+
413+
countAddRateLimited int
414+
countAdd int
415+
countAddAfter int
416+
}
417+
418+
func (q *DelegatingQueue) AddRateLimited(item interface{}) {
419+
q.countAddRateLimited++
420+
q.RateLimitingInterface.AddRateLimited(item)
421+
}
422+
423+
func (q *DelegatingQueue) AddAfter(item interface{}, d time.Duration) {
424+
q.countAddAfter++
425+
q.RateLimitingInterface.AddAfter(item, d)
426+
}
427+
428+
func (q *DelegatingQueue) Add(item interface{}) {
429+
q.countAdd++
430+
q.RateLimitingInterface.Add(item)
431+
}

pkg/reconcile/reconcile.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,18 @@ limitations under the License.
1717
package reconcile
1818

1919
import (
20+
"time"
21+
2022
"k8s.io/apimachinery/pkg/types"
2123
)
2224

2325
// Result contains the result of a Reconciler invocation.
2426
type Result struct {
2527
// Requeue tells the Controller to requeue the reconcile key. Defaults to false.
2628
Requeue bool
29+
30+
// RequeueAfter if greater than 0, tells the Controller to requeue the reconcile key after the Duration.
31+
RequeueAfter time.Duration
2732
}
2833

2934
// Request contains the information necessary to reconcile a Kubernetes object. This includes the

0 commit comments

Comments
 (0)