Skip to content

Support delaying requeue for objectkey. #131

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import (
"strings"
"testing"

log "github.com/go-logr/logr/testing"
"k8s.io/apimachinery/pkg/util/sets"
utiltesting "k8s.io/client-go/util/testing"
log "github.com/go-logr/logr/testing"
)

func TestNewAtomicWriter(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ func (c *Controller) processNextWorkItem() bool {
log.Error(err, "Reconciler error", "Controller", c.Name, "Request", req)

return false
} else if result.RequeueAfter > 0 {
c.Queue.AddAfter(req, result.RequeueAfter)
return true
} else if result.Requeue {
c.Queue.AddRateLimited(req)
return true
Expand Down
66 changes: 66 additions & 0 deletions pkg/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,14 +332,57 @@ var _ = Describe("controller", func() {
defer GinkgoRecover()
Expect(ctrl.Start(stop)).NotTo(HaveOccurred())
}()
dq := &DelegatingQueue{RateLimitingInterface: ctrl.Queue}
ctrl.Queue = dq
ctrl.Queue.Add(request)
Expect(dq.countAdd).To(Equal(1))
Expect(dq.countAddAfter).To(Equal(0))
Expect(dq.countAddRateLimited).To(Equal(0))

By("Invoking Reconciler which will ask for requeue")
Expect(<-reconciled).To(Equal(request))
Expect(dq.countAdd).To(Equal(1))
Expect(dq.countAddAfter).To(Equal(0))
Expect(dq.countAddRateLimited).To(Equal(1))

By("Invoking Reconciler a second time without asking for requeue")
fakeReconcile.Result.Requeue = false
Expect(<-reconciled).To(Equal(request))
Expect(dq.countAdd).To(Equal(1))
Expect(dq.countAddAfter).To(Equal(0))
Expect(dq.countAddRateLimited).To(Equal(1))

By("Removing the item from the queue")
Eventually(ctrl.Queue.Len).Should(Equal(0))
Eventually(func() int { return ctrl.Queue.NumRequeues(request) }).Should(Equal(0))
})

It("should requeue a Request after a duration if the Result sets Requeue:true and "+
"RequeueAfter is set", func() {
fakeReconcile.Result.RequeueAfter = time.Millisecond * 100
go func() {
defer GinkgoRecover()
Expect(ctrl.Start(stop)).NotTo(HaveOccurred())
}()
dq := &DelegatingQueue{RateLimitingInterface: ctrl.Queue}
ctrl.Queue = dq
ctrl.Queue.Add(request)
Expect(dq.countAdd).To(Equal(1))
Expect(dq.countAddAfter).To(Equal(0))
Expect(dq.countAddRateLimited).To(Equal(0))

By("Invoking Reconciler which will ask for requeue")
Expect(<-reconciled).To(Equal(request))
Expect(dq.countAdd).To(Equal(1))
Expect(dq.countAddAfter).To(Equal(1))
Expect(dq.countAddRateLimited).To(Equal(0))

By("Invoking Reconciler a second time without asking for requeue")
fakeReconcile.Result.Requeue = false
Expect(<-reconciled).To(Equal(request))
Expect(dq.countAdd).To(Equal(1))
Expect(dq.countAddAfter).To(Equal(1))
Expect(dq.countAddRateLimited).To(Equal(0))

By("Removing the item from the queue")
Eventually(ctrl.Queue.Len).Should(Equal(0))
Expand All @@ -363,3 +406,26 @@ var _ = Describe("controller", func() {
})
})
})

type DelegatingQueue struct {
workqueue.RateLimitingInterface

countAddRateLimited int
countAdd int
countAddAfter int
}

func (q *DelegatingQueue) AddRateLimited(item interface{}) {
q.countAddRateLimited++
q.RateLimitingInterface.AddRateLimited(item)
}

func (q *DelegatingQueue) AddAfter(item interface{}, d time.Duration) {
q.countAddAfter++
q.RateLimitingInterface.AddAfter(item, d)
}

func (q *DelegatingQueue) Add(item interface{}) {
q.countAdd++
q.RateLimitingInterface.Add(item)
}
5 changes: 5 additions & 0 deletions pkg/reconcile/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@ limitations under the License.
package reconcile

import (
"time"

"k8s.io/apimachinery/pkg/types"
)

// Result contains the result of a Reconciler invocation.
type Result struct {
// Requeue tells the Controller to requeue the reconcile key. Defaults to false.
Requeue bool

// RequeueAfter if greater than 0, tells the Controller to requeue the reconcile key after the Duration.
RequeueAfter time.Duration
}

// Request contains the information necessary to reconcile a Kubernetes object. This includes the
Expand Down