-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Implement the ChannelSource #6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
pkg/controller/controller_manager.go
Outdated
@@ -166,6 +175,9 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error { | |||
cm.stop = stop | |||
cm.informers.Start(stop) | |||
|
|||
// Start the outer informer | |||
cm.outerInformer.Run(stop) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be consistent about Start vs Run in our naming
// (eh.g. GitHub Webhook callback). ChannelSource requires the user to wire the external | ||
// source (eh.g. http handler) to write GenericEvents to the underlying channel. | ||
type ChannelSource struct { | ||
informer *ChannelSourceInformer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need an informer for this? I envisioned reading events from the Channel and passing them directly to the Event handler
return fmt.Errorf("must call InjectInformer on ChannelSource before calling Start") | ||
} | ||
|
||
cs.informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of adding an EventHandler, why not add the event directly to the Channel. e.g.
type ChannelSource struct {
once sync.Once
Source chan event.GenericEvent
stop chan <-struct{}
dest []chan event.GenericEvent
}
func (cs *ChannelSource) InjectStop(stop chan <-struct{}) {cs.stop = stop}
func (cs *ChannelSource) Start(
handler eventhandler.EventHandler,
queue workqueue.RateLimitingInterface) error {
cs.once.Do(func() {
// Fanout GenericEvents to all EventHandler / Queue pairs Watching this source
go func() {
// TODO: Make this select on stop and end the routine
// TODO: Handle closed channel
for _, evt := range cs.Source {
for _, dst := range cs.dest {
// TODO: Make this select on stop and end the routine
go func() { dst <- evt }
}
}
}
})
dst := make(chan event.GenericEvent)
go func() { for evt := range dst {handler.Generic(evt, queue)} }
cs.dst = append(cs.dst, dst)
}
Hey can we keep this in another PR. I think it would be useful for a SlackBotSource or a GitHubSource of events. |
fa5d8e9
to
6cab6c3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to a simpler way.
Will keep the "informer mode" in my private repo, in case we need it someday :)
Still I'm not very happy with the once.Do
part. It's in fact similar to the startup behavior of an informer.
Maybe we should explicitly perform it outside the func, rather than inject a stop channel into the source
pkg/controller/source/source.go
Outdated
select { | ||
case <-cs.stop: | ||
// Close source channel. | ||
// TODO: should we close the source channel here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if we need to close the source channel here.
Maybe users meant to use it somewhere out of the controller, and handle its lifecycle themselves?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the lifecycle of the source channel should be completely managed by the user. We shouldn't create it or close it for them.
pkg/controller/manager.go
Outdated
@@ -139,6 +143,12 @@ func (cm *controllerManager) injectInto(i interface{}) error { | |||
if _, err := inject.DoInformers(cm.informers, i); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we need a fix here.
We expect to have it inject whichever params that i
implement, rather than return directly once an error occur
3d5cfcd
to
039baf6
Compare
pkg/controller/source/source.go
Outdated
} | ||
|
||
// stop should have been injected before Start was called | ||
if cs.stop == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bug in the error you copied :P This should be InjectSourceChannel instead of DoStop
pkg/controller/source/source.go
Outdated
|
||
// InjectSourceChannel is internal should be called only by the Controller. | ||
// It is used to inject the source event channel initialized by the ControllerManager. | ||
func (cs *ChannelSource) InjectSourceChannel(c chan event.GenericEvent) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think of having the user provide the channel when creating the object (like Type on KindSource)?
Private Injected fields are initialized by the ControllerManager. Public fields should be supplied by the user.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say I haven't got enough knowledge on how users will use it.
I think you have more say over this point, so let's make it the way you suggested :)
pkg/controller/source/source.go
Outdated
// once ensures the event distribution goroutine will be performed only once | ||
once sync.Once | ||
// source is the source channel to fetch GenericEvents | ||
source chan event.GenericEvent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering if this should be a read channel only. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch
pkg/controller/source/source.go
Outdated
close(cs.source) | ||
|
||
// Close destination channels | ||
cs.destLock.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if this is a common pattern, but if we wrap this in an in-line function we can call defer unlock.
func() {
cs.destLock.Lock()
defer cs.destLock.Unlock()
for _, dst := range cs.dest {
close(dst)
}
}()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Graceful vs Easy to understand
I'm fine with both
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or
case <-cs.stop:
cs.doStop()
case evt := <-cs.source:
cs.DoEvent(evt)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that makes sense
pkg/controller/source/source.go
Outdated
case evt := <-cs.source: | ||
cs.destLock.Lock() | ||
for _, dst := range cs.dest { | ||
dst <- evt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be synchronous or asynchronous? With synchronous if one of the EventHandlers blocks, it will block all of them. Is there any advantage to this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's what I meant to discuss with you. I guess we might have to tolerate this.
if put it under a goroutine, we'll have to face the race condition of write message to closed channel, even if we have lock here, it's out of control.
Most obviously, you'll see error in go vet
if we make it asynchronous
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this is still asynchronous because of the buffer. Is the buffer a more common / better pattern? Please leave a comment with the information for future readers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I think the buffer should have been big enough normally, and if we still see it blocked, the controller is thought to be in an abnormal state.
Will add comment for clarification.
pkg/runtime/inject/inject.go
Outdated
|
||
// DoSourceChannel will set source channel on i and return the result if it implements SourceChannel. | ||
// Returns false if i does not implement SourceChannel. | ||
func DoSourceChannel(ch chan event.GenericEvent, i interface{}) (bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets no inject this.
pkg/controller/source/source.go
Outdated
@@ -31,6 +32,11 @@ import ( | |||
logf "github.com/kubernetes-sigs/controller-runtime/pkg/runtime/log" | |||
) | |||
|
|||
const ( | |||
// initialBufferSize is the initial number of event notifications that can be buffered. | |||
initialBufferSize = 1024 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets make this a variable on the ChannelSource and write a test for what happens when we run out of buffer (e.g. we can set the buffer size to 1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So when do you think it should be specified?
Or we'll in fact always use the default value, and the field will only be changed in test?
pkg/controller/source/source_test.go
Outdated
@@ -203,4 +204,57 @@ var _ = Describe("Source", func() { | |||
}) | |||
}) | |||
}) | |||
Describe("ChannelSource", func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a test for when the ChannelSource is reused by multiple watches with different EventHandlers.
pkg/controller/source/source_test.go
Outdated
var c chan struct{} | ||
var ch chan event.GenericEvent | ||
|
||
BeforeEach(func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add tests for error conditions where the stop or channel are not specified.
pkg/controller/source/source_test.go
Outdated
|
||
corev1 "k8s.io/api/core/v1" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets also add a controller_integration_test.go case that exercises the end-to-end flow.
This looks good. A few minor comments. |
c421290
to
e844cac
Compare
pkg/controller/source/source.go
Outdated
dest []chan event.GenericEvent | ||
|
||
// DestBufferSize is the specified buffer size of dest channels | ||
DestBufferSize int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
followup: Include the default value in the comment.
pkg/controller/source/source_test.go
Outdated
It("should block if exceed buffer size", func(done Done) { | ||
ch := make(chan event.GenericEvent) | ||
evt := event.GenericEvent{} | ||
interval := 5 * time.Second |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will make the test run a really long time. Instead of sleeping, consider waiting on a signal / channel.
pkg/controller/source/source_test.go
Outdated
}, | ||
GenericFunc: func(q2 workqueue.RateLimitingInterface, evt event.GenericEvent) { | ||
defer GinkgoRecover() | ||
time.Sleep(interval) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't sleep, wait on a channel or something
pkg/controller/source/source_test.go
Outdated
distributeInterval := time.Now().Sub(beforeEvent) | ||
Expect(distributeInterval >= interval).To(BeTrue()) | ||
close(done) | ||
}, 15) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep the timeout at 1 second
pkg/controller/source/source_test.go
Outdated
<-c | ||
close(done) | ||
}) | ||
It("should block if exceed buffer size", func(done Done) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we care less that it blocks, and more that it doesn't blow up. I think the main thing we want to test is that we can write 3 events to the source channel while the eventhandler is blocked, then unblock the eventhandler, and all the 3 events will get processed.
lgtm once the test run time is reduce back down to sub second |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes look ok. However I am still struggling to come up with a concrete use case :).
Thinking more about this.
pkg/controller/source/source.go
Outdated
|
||
// InjectStop is internal should be called only by the Controller. | ||
// It is used to inject the stop channel initialized by the ControllerManager. | ||
func (cs *ChannelSource) InjectStop(stop <-chan struct{}) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better name it explicit InjectStopChannel
?
pkg/runtime/inject/inject.go
Outdated
|
||
// Stop is used by the ControllerManager to inject stop channel into Sources, EventHandlers, Predicates, and | ||
// Reconciles | ||
type Stop interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stoppable
?
pkg/controller/source/source.go
Outdated
@@ -45,21 +51,129 @@ type Source interface { | |||
Start(eventhandler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error | |||
} | |||
|
|||
var _ Source = &ChannelSource{} | |||
|
|||
// ChannelSource is used to provide a source of events originating outside the cluster |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am trying to understand the correct usage pattern for the channel-source so that we can advise users accordingly.
For external triggers which maps to changes in a type
in K8s, controller will automatically pick them up because it is watching that type.
Now left are the ones which don't result in any data change in the type directly but impact the type, channel source is the right fit for those.
We have to come up with a better example demonstrating this source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this is of great importance. Maybe we can make it when constructing sample controller.
Rebased on the latest code; also, addressed the comments :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
/approve |
Implement the ChannelSource
…min-cluster Create minimal clusterscoped cache
Address: kubernetes-sigs/kubebuilder#254