Skip to content

Commit a4e8aca

Browse files
committed
move Source definitions to a single folder without breaking the API
Signed-off-by: Tim Ramlot <[email protected]>
1 parent bbe3bbe commit a4e8aca

File tree

9 files changed

+292
-169
lines changed

9 files changed

+292
-169
lines changed

pkg/builder/controller.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
3131
"sigs.k8s.io/controller-runtime/pkg/controller"
3232
"sigs.k8s.io/controller-runtime/pkg/handler"
33-
internalsource "sigs.k8s.io/controller-runtime/pkg/internal/source"
3433
"sigs.k8s.io/controller-runtime/pkg/manager"
3534
"sigs.k8s.io/controller-runtime/pkg/predicate"
3635
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -313,12 +312,14 @@ func (blder *Builder) doWatch() error {
313312
}
314313
for _, w := range blder.watchesInput {
315314
// If the source of this watch is of type Kind, project it.
316-
if srcKind, ok := w.src.(*internalsource.Kind); ok {
317-
typeForSrc, err := blder.project(srcKind.Type, w.objectProjection)
318-
if err != nil {
315+
if srcKind, ok := w.src.(interface {
316+
ProjectObject(func(client.Object) (client.Object, error)) error
317+
}); ok {
318+
if err := srcKind.ProjectObject(func(o client.Object) (client.Object, error) {
319+
return blder.project(o, w.objectProjection)
320+
}); err != nil {
319321
return err
320322
}
321-
srcKind.Type = typeForSrc
322323
}
323324
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
324325
allPredicates = append(allPredicates, w.predicates...)

pkg/source/internal/channel.go

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package internal
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"sync"
23+
24+
"k8s.io/client-go/util/workqueue"
25+
"sigs.k8s.io/controller-runtime/pkg/event"
26+
"sigs.k8s.io/controller-runtime/pkg/handler"
27+
"sigs.k8s.io/controller-runtime/pkg/predicate"
28+
)
29+
30+
const (
31+
// defaultBufferSize is the default number of event notifications that can be buffered.
32+
defaultBufferSize = 1024
33+
)
34+
35+
// Channel is used to provide a source of events originating outside the cluster
36+
// (e.g. GitHub Webhook callback). Channel requires the user to wire the external
37+
// source (eh.g. http handler) to write GenericEvents to the underlying channel.
38+
type Channel struct {
39+
// once ensures the event distribution goroutine will be performed only once
40+
once sync.Once
41+
42+
// Source is the source channel to fetch GenericEvents
43+
Source <-chan event.GenericEvent
44+
45+
// dest is the destination channels of the added event handlers
46+
dest []chan event.GenericEvent
47+
48+
// DestBufferSize is the specified buffer size of dest channels.
49+
// Default to 1024 if not specified.
50+
DestBufferSize int
51+
52+
// destLock is to ensure the destination channels are safely added/removed
53+
destLock sync.Mutex
54+
}
55+
56+
func (cs *Channel) String() string {
57+
return fmt.Sprintf("channel source: %p", cs)
58+
}
59+
60+
// Start implements Source and should only be called by the Controller.
61+
func (cs *Channel) Start(
62+
ctx context.Context,
63+
handler handler.EventHandler,
64+
queue workqueue.RateLimitingInterface,
65+
prct ...predicate.Predicate) error {
66+
// Source should have been specified by the user.
67+
if cs.Source == nil {
68+
return fmt.Errorf("must specify Channel.Source")
69+
}
70+
71+
// use default value if DestBufferSize not specified
72+
if cs.DestBufferSize == 0 {
73+
cs.DestBufferSize = defaultBufferSize
74+
}
75+
76+
dst := make(chan event.GenericEvent, cs.DestBufferSize)
77+
78+
cs.destLock.Lock()
79+
cs.dest = append(cs.dest, dst)
80+
cs.destLock.Unlock()
81+
82+
cs.once.Do(func() {
83+
// Distribute GenericEvents to all EventHandler / Queue pairs Watching this source
84+
go cs.syncLoop(ctx)
85+
})
86+
87+
go func() {
88+
for evt := range dst {
89+
shouldHandle := true
90+
for _, p := range prct {
91+
if !p.Generic(evt) {
92+
shouldHandle = false
93+
break
94+
}
95+
}
96+
97+
if shouldHandle {
98+
func() {
99+
ctx, cancel := context.WithCancel(ctx)
100+
defer cancel()
101+
handler.Generic(ctx, evt, queue)
102+
}()
103+
}
104+
}
105+
}()
106+
107+
return nil
108+
}
109+
110+
func (cs *Channel) doStop() {
111+
cs.destLock.Lock()
112+
defer cs.destLock.Unlock()
113+
114+
for _, dst := range cs.dest {
115+
close(dst)
116+
}
117+
}
118+
119+
func (cs *Channel) distribute(evt event.GenericEvent) {
120+
cs.destLock.Lock()
121+
defer cs.destLock.Unlock()
122+
123+
for _, dst := range cs.dest {
124+
// We cannot make it under goroutine here, or we'll meet the
125+
// race condition of writing message to closed channels.
126+
// To avoid blocking, the dest channels are expected to be of
127+
// proper buffer size. If we still see it blocked, then
128+
// the controller is thought to be in an abnormal state.
129+
dst <- evt
130+
}
131+
}
132+
133+
func (cs *Channel) syncLoop(ctx context.Context) {
134+
for {
135+
select {
136+
case <-ctx.Done():
137+
// Close destination channels
138+
cs.doStop()
139+
return
140+
case evt, stillOpen := <-cs.Source:
141+
if !stillOpen {
142+
// if the source channel is closed, we're never gonna get
143+
// anything more on it, so stop & bail
144+
cs.doStop()
145+
return
146+
}
147+
cs.distribute(evt)
148+
}
149+
}
150+
}

pkg/source/internal/func.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package internal
18+
19+
import (
20+
"context"
21+
"fmt"
22+
23+
"k8s.io/client-go/util/workqueue"
24+
"sigs.k8s.io/controller-runtime/pkg/handler"
25+
"sigs.k8s.io/controller-runtime/pkg/predicate"
26+
)
27+
28+
// Func is a function that implements Source.
29+
type Func func(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
30+
31+
// Start implements Source.
32+
func (f Func) Start(ctx context.Context, evt handler.EventHandler, queue workqueue.RateLimitingInterface,
33+
pr ...predicate.Predicate) error {
34+
return f(ctx, evt, queue, pr...)
35+
}
36+
37+
func (f Func) String() string {
38+
return fmt.Sprintf("func source: %p", f)
39+
}

pkg/source/internal/informer.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package internal
18+
19+
import (
20+
"context"
21+
"fmt"
22+
23+
"k8s.io/client-go/util/workqueue"
24+
"sigs.k8s.io/controller-runtime/pkg/cache"
25+
"sigs.k8s.io/controller-runtime/pkg/handler"
26+
"sigs.k8s.io/controller-runtime/pkg/predicate"
27+
)
28+
29+
// Informer is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create).
30+
type Informer struct {
31+
// Informer is the controller-runtime Informer
32+
Informer cache.Informer
33+
}
34+
35+
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
36+
// to enqueue reconcile.Requests.
37+
func (is *Informer) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
38+
prct ...predicate.Predicate) error {
39+
// Informer should have been specified by the user.
40+
if is.Informer == nil {
41+
return fmt.Errorf("must specify Informer.Informer")
42+
}
43+
44+
_, err := is.Informer.AddEventHandler(NewEventHandler(ctx, queue, handler, prct).HandlerFuncs())
45+
if err != nil {
46+
return err
47+
}
48+
return nil
49+
}
50+
51+
func (is *Informer) String() string {
52+
return fmt.Sprintf("informer source: %p", is.Informer)
53+
}

pkg/internal/source/internal_test.go renamed to pkg/source/internal/internal_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525
"k8s.io/client-go/util/workqueue"
2626
"sigs.k8s.io/controller-runtime/pkg/event"
2727
"sigs.k8s.io/controller-runtime/pkg/handler"
28-
internal "sigs.k8s.io/controller-runtime/pkg/internal/source"
28+
internal "sigs.k8s.io/controller-runtime/pkg/source/internal"
2929

3030
corev1 "k8s.io/api/core/v1"
3131
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

pkg/internal/source/kind.go renamed to pkg/source/internal/kind.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
117
package internal
218

319
import (
@@ -94,6 +110,23 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w
94110
return nil
95111
}
96112

113+
// ProjectObject sets the Kind's object to the given object.
114+
// This function should only be called by the Controller builder.
115+
// NOTE: make sure to update pkg/builder/controller.go if you change this function.
116+
func (ks *Kind) ProjectObject(fn func(client.Object) (client.Object, error)) error {
117+
if ks.startCancel != nil {
118+
return fmt.Errorf("cannot project object after Start has been called")
119+
}
120+
121+
newType, err := fn(ks.Type)
122+
if err != nil {
123+
return err
124+
}
125+
126+
ks.Type = newType
127+
return nil
128+
}
129+
97130
func (ks *Kind) String() string {
98131
if ks.Type != nil {
99132
return fmt.Sprintf("kind source: %T", ks.Type)

0 commit comments

Comments
 (0)