Skip to content

Implement the ChannelSource #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 12, 2018

Conversation

lichuqiang
Copy link
Contributor

@k8s-ci-robot k8s-ci-robot added do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. cncf-cla: yes Indicates the PR's author has signed the CNCF CLA. size/L Denotes a PR that changes 100-499 lines, ignoring generated files. labels Jun 8, 2018
@@ -166,6 +175,9 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
cm.stop = stop
cm.informers.Start(stop)

// Start the outer informer
cm.outerInformer.Run(stop)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be consistent about Start vs Run in our naming

// (eh.g. GitHub Webhook callback). ChannelSource requires the user to wire the external
// source (eh.g. http handler) to write GenericEvents to the underlying channel.
type ChannelSource struct {
informer *ChannelSourceInformer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need an informer for this? I envisioned reading events from the Channel and passing them directly to the Event handler

return fmt.Errorf("must call InjectInformer on ChannelSource before calling Start")
}

cs.informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler})
Copy link
Contributor

@pwittrock pwittrock Jun 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of adding an EventHandler, why not add the event directly to the Channel. e.g.

type ChannelSource struct {
  once sync.Once
  Source chan event.GenericEvent
  stop chan <-struct{}
  dest []chan event.GenericEvent
}

func (cs *ChannelSource) InjectStop(stop chan <-struct{}) {cs.stop = stop}

func (cs *ChannelSource) Start(
	handler eventhandler.EventHandler,
	queue workqueue.RateLimitingInterface) error {
  cs.once.Do(func() {
    // Fanout GenericEvents to all EventHandler / Queue pairs Watching this source
    go func() {
      // TODO: Make this select on stop and end the routine
      // TODO: Handle closed channel
      for _, evt := range cs.Source {
        for _, dst := range cs.dest {
           // TODO: Make this select on stop and end the routine
          go func() { dst <- evt }
        }
    }
    }
  })
  
  dst := make(chan event.GenericEvent)
  go func() { for evt := range dst {handler.Generic(evt, queue)} }
  cs.dst = append(cs.dst, dst)
}

@pwittrock
Copy link
Contributor

Hey can we keep this in another PR. I think it would be useful for a SlackBotSource or a GitHubSource of events.

@lichuqiang lichuqiang force-pushed the master branch 2 times, most recently from fa5d8e9 to 6cab6c3 Compare June 9, 2018 08:05
Copy link
Contributor Author

@lichuqiang lichuqiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to a simpler way.
Will keep the "informer mode" in my private repo, in case we need it someday :)

Still I'm not very happy with the once.Do part. It's in fact similar to the startup behavior of an informer.
Maybe we should explicitly perform it outside the func, rather than inject a stop channel into the source

select {
case <-cs.stop:
// Close source channel.
// TODO: should we close the source channel here?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we need to close the source channel here.
Maybe users meant to use it somewhere out of the controller, and handle its lifecycle themselves?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the lifecycle of the source channel should be completely managed by the user. We shouldn't create it or close it for them.

@@ -139,6 +143,12 @@ func (cm *controllerManager) injectInto(i interface{}) error {
if _, err := inject.DoInformers(cm.informers, i); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we need a fix here.
We expect to have it inject whichever params that i implement, rather than return directly once an error occur

@lichuqiang lichuqiang force-pushed the master branch 2 times, most recently from 3d5cfcd to 039baf6 Compare June 9, 2018 10:19
@lichuqiang lichuqiang changed the title WIP: Implement the ChannelSource Implement the ChannelSource Jun 9, 2018
@k8s-ci-robot k8s-ci-robot removed the do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. label Jun 9, 2018
}

// stop should have been injected before Start was called
if cs.stop == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bug in the error you copied :P This should be InjectSourceChannel instead of DoStop


// InjectSourceChannel is internal should be called only by the Controller.
// It is used to inject the source event channel initialized by the ControllerManager.
func (cs *ChannelSource) InjectSourceChannel(c chan event.GenericEvent) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of having the user provide the channel when creating the object (like Type on KindSource)?

Private Injected fields are initialized by the ControllerManager. Public fields should be supplied by the user.

Copy link
Contributor Author

@lichuqiang lichuqiang Jun 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say I haven't got enough knowledge on how users will use it.
I think you have more say over this point, so let's make it the way you suggested :)

// once ensures the event distribution goroutine will be performed only once
once sync.Once
// source is the source channel to fetch GenericEvents
source chan event.GenericEvent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if this should be a read channel only. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

close(cs.source)

// Close destination channels
cs.destLock.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if this is a common pattern, but if we wrap this in an in-line function we can call defer unlock.

func() {
cs.destLock.Lock()
defer cs.destLock.Unlock()
for _, dst := range cs.dest {
  close(dst)
}
}()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Graceful vs Easy to understand
I'm fine with both

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or

case <-cs.stop:
  cs.doStop()
case evt := <-cs.source:
  cs.DoEvent(evt)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes sense

case evt := <-cs.source:
cs.destLock.Lock()
for _, dst := range cs.dest {
dst <- evt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be synchronous or asynchronous? With synchronous if one of the EventHandlers blocks, it will block all of them. Is there any advantage to this?

Copy link
Contributor Author

@lichuqiang lichuqiang Jun 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I meant to discuss with you. I guess we might have to tolerate this.
if put it under a goroutine, we'll have to face the race condition of write message to closed channel, even if we have lock here, it's out of control.

Most obviously, you'll see error in go vet if we make it asynchronous

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is still asynchronous because of the buffer. Is the buffer a more common / better pattern? Please leave a comment with the information for future readers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I think the buffer should have been big enough normally, and if we still see it blocked, the controller is thought to be in an abnormal state.
Will add comment for clarification.


// DoSourceChannel will set source channel on i and return the result if it implements SourceChannel.
// Returns false if i does not implement SourceChannel.
func DoSourceChannel(ch chan event.GenericEvent, i interface{}) (bool, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets no inject this.

@@ -31,6 +32,11 @@ import (
logf "github.com/kubernetes-sigs/controller-runtime/pkg/runtime/log"
)

const (
// initialBufferSize is the initial number of event notifications that can be buffered.
initialBufferSize = 1024
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets make this a variable on the ChannelSource and write a test for what happens when we run out of buffer (e.g. we can set the buffer size to 1)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So when do you think it should be specified?
Or we'll in fact always use the default value, and the field will only be changed in test?

@@ -203,4 +204,57 @@ var _ = Describe("Source", func() {
})
})
})
Describe("ChannelSource", func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a test for when the ChannelSource is reused by multiple watches with different EventHandlers.

var c chan struct{}
var ch chan event.GenericEvent

BeforeEach(func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add tests for error conditions where the stop or channel are not specified.


corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets also add a controller_integration_test.go case that exercises the end-to-end flow.

@pwittrock
Copy link
Contributor

This looks good. A few minor comments.

@lichuqiang lichuqiang force-pushed the master branch 3 times, most recently from c421290 to e844cac Compare June 11, 2018 11:37
dest []chan event.GenericEvent

// DestBufferSize is the specified buffer size of dest channels
DestBufferSize int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

followup: Include the default value in the comment.

It("should block if exceed buffer size", func(done Done) {
ch := make(chan event.GenericEvent)
evt := event.GenericEvent{}
interval := 5 * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will make the test run a really long time. Instead of sleeping, consider waiting on a signal / channel.

},
GenericFunc: func(q2 workqueue.RateLimitingInterface, evt event.GenericEvent) {
defer GinkgoRecover()
time.Sleep(interval)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't sleep, wait on a channel or something

distributeInterval := time.Now().Sub(beforeEvent)
Expect(distributeInterval >= interval).To(BeTrue())
close(done)
}, 15)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep the timeout at 1 second

<-c
close(done)
})
It("should block if exceed buffer size", func(done Done) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we care less that it blocks, and more that it doesn't blow up. I think the main thing we want to test is that we can write 3 events to the source channel while the eventhandler is blocked, then unblock the eventhandler, and all the 3 events will get processed.

@pwittrock
Copy link
Contributor

lgtm once the test run time is reduce back down to sub second

Copy link
Contributor

@droot droot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes look ok. However I am still struggling to come up with a concrete use case :).

Thinking more about this.


// InjectStop is internal should be called only by the Controller.
// It is used to inject the stop channel initialized by the ControllerManager.
func (cs *ChannelSource) InjectStop(stop <-chan struct{}) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better name it explicit InjectStopChannel ?


// Stop is used by the ControllerManager to inject stop channel into Sources, EventHandlers, Predicates, and
// Reconciles
type Stop interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stoppable ?

@@ -45,21 +51,129 @@ type Source interface {
Start(eventhandler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
}

var _ Source = &ChannelSource{}

// ChannelSource is used to provide a source of events originating outside the cluster
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to understand the correct usage pattern for the channel-source so that we can advise users accordingly.

For external triggers which maps to changes in a type in K8s, controller will automatically pick them up because it is watching that type.
Now left are the ones which don't result in any data change in the type directly but impact the type, channel source is the right fit for those.

We have to come up with a better example demonstrating this source.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is of great importance. Maybe we can make it when constructing sample controller.

@lichuqiang
Copy link
Contributor Author

Rebased on the latest code; also, addressed the comments :)

Copy link
Contributor

@droot droot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/lgtm

@k8s-ci-robot k8s-ci-robot added the lgtm "Looks good to me", indicates that a PR is ready to be merged. label Jun 12, 2018
@pwittrock
Copy link
Contributor

/approve

@pwittrock pwittrock added the approved Indicates a PR has been approved by an approver from all required OWNERS files. label Jun 12, 2018
@k8s-ci-robot k8s-ci-robot merged commit b5597f5 into kubernetes-sigs:master Jun 12, 2018
justinsb pushed a commit to justinsb/controller-runtime that referenced this pull request Dec 7, 2018
DirectXMan12 pushed a commit that referenced this pull request Jan 31, 2020
varshaprasad96 pushed a commit to varshaprasad96/controller-runtime that referenced this pull request Mar 14, 2022
…min-cluster

Create minimal clusterscoped cache
ncdc pushed a commit to ncdc/controller-runtime that referenced this pull request May 27, 2022
)

* Add missing godocs and provide default for HTTPClient
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved Indicates a PR has been approved by an approver from all required OWNERS files. cncf-cla: yes Indicates the PR's author has signed the CNCF CLA. lgtm "Looks good to me", indicates that a PR is ready to be merged. size/L Denotes a PR that changes 100-499 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants