Skip to content

Commit ab55aa7

Browse files
authored
Merge pull request #1146 from alvaroaleman/source-cope-with-prepopulated
🐛 Source.Channel: Cope with pre-existing events in the channel
2 parents 4717461 + 213acce commit ab55aa7

File tree

2 files changed

+40
-3
lines changed

2 files changed

+40
-3
lines changed

pkg/source/source.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -213,12 +213,14 @@ func (cs *Channel) Start(
213213
cs.DestBufferSize = defaultBufferSize
214214
}
215215

216+
dst := make(chan event.GenericEvent, cs.DestBufferSize)
217+
cs.dest = append(cs.dest, dst)
218+
216219
cs.once.Do(func() {
217220
// Distribute GenericEvents to all EventHandler / Queue pairs Watching this source
218221
go cs.syncLoop()
219222
})
220223

221-
dst := make(chan event.GenericEvent, cs.DestBufferSize)
222224
go func() {
223225
for evt := range dst {
224226
shouldHandle := true
@@ -238,8 +240,6 @@ func (cs *Channel) Start(
238240
cs.destLock.Lock()
239241
defer cs.destLock.Unlock()
240242

241-
cs.dest = append(cs.dest, dst)
242-
243243
return nil
244244
}
245245

pkg/source/source_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,43 @@ var _ = Describe("Source", func() {
405405

406406
close(done)
407407
})
408+
It("should be able to cope with events in the channel before the source is started", func(done Done) {
409+
ch := make(chan event.GenericEvent, 1)
410+
processed := make(chan struct{})
411+
evt := event.GenericEvent{}
412+
ch <- evt
413+
414+
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
415+
// Add a handler to get distribution blocked
416+
instance := &source.Channel{Source: ch}
417+
instance.DestBufferSize = 1
418+
Expect(inject.StopChannelInto(stop, instance)).To(BeTrue())
419+
420+
err := instance.Start(handler.Funcs{
421+
CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) {
422+
defer GinkgoRecover()
423+
Fail("Unexpected CreateEvent")
424+
},
425+
UpdateFunc: func(event.UpdateEvent, workqueue.RateLimitingInterface) {
426+
defer GinkgoRecover()
427+
Fail("Unexpected UpdateEvent")
428+
},
429+
DeleteFunc: func(event.DeleteEvent, workqueue.RateLimitingInterface) {
430+
defer GinkgoRecover()
431+
Fail("Unexpected DeleteEvent")
432+
},
433+
GenericFunc: func(evt event.GenericEvent, q2 workqueue.RateLimitingInterface) {
434+
defer GinkgoRecover()
435+
436+
close(processed)
437+
},
438+
}, q)
439+
Expect(err).NotTo(HaveOccurred())
440+
441+
<-processed
442+
443+
close(done)
444+
})
408445
It("should get error if no source specified", func(done Done) {
409446
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
410447
instance := &source.Channel{ /*no source specified*/ }

0 commit comments

Comments
 (0)