Skip to content

Implement the ChannelSource #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ func (cm *controllerManager) SetFields(i interface{}) error {
if _, err := inject.InjectorInto(cm.SetFields, i); err != nil {
return err
}
if _, err := inject.StopChannelInto(cm.stop, i); err != nil {
return err
}
return nil
}

Expand Down
15 changes: 15 additions & 0 deletions pkg/runtime/inject/inject.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,21 @@ func SchemeInto(scheme *runtime.Scheme, i interface{}) (bool, error) {
return false, nil
}

// Stoppable is used by the ControllerManager to inject stop channel into Sources,
// EventHandlers, Predicates, and Reconciles.
type Stoppable interface {
InjectStopChannel(<-chan struct{}) error
}

// StopChannelInto will set stop channel on i and return the result if it implements Stoppable.
// Returns false if i does not implement Stoppable.
func StopChannelInto(stop <-chan struct{}, i interface{}) (bool, error) {
if s, ok := i.(Stoppable); ok {
return true, s.InjectStopChannel(stop)
}
return false, nil
}

// Func injects dependencies into i.
type Func func(i interface{}) error

Expand Down
6 changes: 5 additions & 1 deletion pkg/source/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,9 @@ func ExampleKind() {
// provided by the event.
func ExampleChannel() {
events := make(chan event.GenericEvent)
ctrl.Watch(source.Channel(events), &handler.Enqueue{})

ctrl.Watch(
&source.Channel{Source: events},
&handler.Enqueue{},
)
}
121 changes: 118 additions & 3 deletions pkg/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package source

import (
"fmt"
"sync"

"github.com/kubernetes-sigs/controller-runtime/pkg/event"
"github.com/kubernetes-sigs/controller-runtime/pkg/handler"
Expand All @@ -30,6 +31,11 @@ import (
"github.com/kubernetes-sigs/controller-runtime/pkg/predicate"
)

const (
// defaultBufferSize is the default number of event notifications that can be buffered.
defaultBufferSize = 1024
)

// Source is a source of events (eh.g. Create, Update, Delete operations on Kubernetes Objects, Webhook callbacks, etc)
// which should be processed by event.EventHandlers to enqueue reconcile.Requests.
//
Expand All @@ -45,21 +51,130 @@ type Source interface {
Start(handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
}

var _ Source = &Channel{}

// Channel is used to provide a source of events originating outside the cluster
// (eh.g. GitHub Webhook callback). Channel requires the user to wire the external
// source (eh.g. http handler) to write GenericEvents to the underlying channel.
type Channel chan event.GenericEvent
type Channel struct {
// once ensures the event distribution goroutine will be performed only once
once sync.Once

// Source is the source channel to fetch GenericEvents
Source <-chan event.GenericEvent

// stop is to end ongoing goroutine, and close the channels
stop <-chan struct{}

// dest is the destination channels of the added event handlers
dest []chan event.GenericEvent

// DestBufferSize is the specified buffer size of dest channels.
// Default to 1024 if not specified.
DestBufferSize int

// destLock is to ensure the destination channels are safely added/removed
destLock sync.Mutex
}

var _ inject.Stoppable = &Channel{}

// InjectStopChannel is internal should be called only by the Controller.
// It is used to inject the stop channel initialized by the ControllerManager.
func (cs *Channel) InjectStopChannel(stop <-chan struct{}) error {
if cs.stop == nil {
cs.stop = stop
}

var _ Source = Channel(make(chan event.GenericEvent))
return nil
}

// Start implements Source and should only be called by the Controller.
func (ks Channel) Start(
func (cs *Channel) Start(
handler handler.EventHandler,
queue workqueue.RateLimitingInterface,
prct ...predicate.Predicate) error {
// Source should have been specified by the user.
if cs.Source == nil {
return fmt.Errorf("must specify Channel.Source")
}

// stop should have been injected before Start was called
if cs.stop == nil {
return fmt.Errorf("must call InjectStop on Channel before calling Start")
}

// use default value if DestBufferSize not specified
if cs.DestBufferSize == 0 {
cs.DestBufferSize = defaultBufferSize
}

cs.once.Do(func() {
// Distribute GenericEvents to all EventHandler / Queue pairs Watching this source
go cs.syncLoop()
})

dst := make(chan event.GenericEvent, cs.DestBufferSize)
go func() {
for evt := range dst {
shouldHandle := true
for _, p := range prct {
if !p.Generic(evt) {
shouldHandle = false
break
}
}

if shouldHandle {
handler.Generic(queue, evt)
}
}
}()

cs.destLock.Lock()
defer cs.destLock.Unlock()

cs.dest = append(cs.dest, dst)

return nil
}

func (cs *Channel) doStop() {
cs.destLock.Lock()
defer cs.destLock.Unlock()

for _, dst := range cs.dest {
close(dst)
}
}

func (cs *Channel) distribute(evt event.GenericEvent) {
cs.destLock.Lock()
defer cs.destLock.Unlock()

for _, dst := range cs.dest {
// We cannot make it under goroutine here, or we'll meet the
// race condition of writing message to closed channels.
// To avoid blocking, the dest channels are expected to be of
// proper buffer size. If we still see it blocked, then
// the controller is thought to be in an abnormal state.
dst <- evt
}
}

func (cs *Channel) syncLoop() {
for {
select {
case <-cs.stop:
// Close destination channels
cs.doStop()
return
case evt := <-cs.Source:
cs.distribute(evt)
}
}
}

// Kind is used to provide a source of events originating inside the cluster from Watches (eh.g. Pod Create)
type Kind struct {
// Type is the type of object to watch. e.g. &v1.Pod{}
Expand Down
Loading