Skip to content

Commit 22f4a15

Browse files
committed
move all Source implementations to a single folder
Signed-off-by: Tim Ramlot <[email protected]>
1 parent 4f00207 commit 22f4a15

File tree

8 files changed

+286
-173
lines changed

8 files changed

+286
-173
lines changed

pkg/source/internal/channel.go

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package internal
18+
19+
import (
20+
"context"
21+
"errors"
22+
"fmt"
23+
"sync"
24+
25+
"k8s.io/client-go/util/workqueue"
26+
"k8s.io/utils/ptr"
27+
"sigs.k8s.io/controller-runtime/pkg/event"
28+
"sigs.k8s.io/controller-runtime/pkg/handler"
29+
"sigs.k8s.io/controller-runtime/pkg/predicate"
30+
)
31+
32+
// Channel is used to provide a source of events originating outside the cluster
33+
// (e.g. GitHub Webhook callback). Channel requires the user to wire the external
34+
// source (e.g. http handler) to write GenericEvents to the underlying channel.
35+
type Channel[T any] struct {
36+
// once ensures the event distribution goroutine will be performed only once
37+
once sync.Once
38+
39+
// source is the source channel to fetch GenericEvents
40+
Source <-chan event.TypedGenericEvent[T]
41+
42+
Handler handler.TypedEventHandler[T]
43+
44+
Predicates []predicate.TypedPredicate[T]
45+
46+
BufferSize *int
47+
48+
// dest is the destination channels of the added event handlers
49+
dest []chan event.TypedGenericEvent[T]
50+
51+
// destLock is to ensure the destination channels are safely added/removed
52+
destLock sync.Mutex
53+
}
54+
55+
func (cs *Channel[T]) String() string {
56+
return fmt.Sprintf("channel source: %p", cs)
57+
}
58+
59+
// Start implements Source and should only be called by the Controller.
60+
func (cs *Channel[T]) Start(
61+
ctx context.Context,
62+
queue workqueue.RateLimitingInterface,
63+
) error {
64+
// Source should have been specified by the user.
65+
if cs.Source == nil {
66+
return fmt.Errorf("must specify Channel.Source")
67+
}
68+
if cs.Handler == nil {
69+
return errors.New("must specify Channel.Handler")
70+
}
71+
72+
if cs.BufferSize == nil {
73+
cs.BufferSize = ptr.To(1024)
74+
}
75+
76+
dst := make(chan event.TypedGenericEvent[T], *cs.BufferSize)
77+
78+
cs.destLock.Lock()
79+
cs.dest = append(cs.dest, dst)
80+
cs.destLock.Unlock()
81+
82+
cs.once.Do(func() {
83+
// Distribute GenericEvents to all EventHandler / Queue pairs Watching this source
84+
go cs.syncLoop(ctx)
85+
})
86+
87+
go func() {
88+
for evt := range dst {
89+
shouldHandle := true
90+
for _, p := range cs.Predicates {
91+
if !p.Generic(evt) {
92+
shouldHandle = false
93+
break
94+
}
95+
}
96+
97+
if shouldHandle {
98+
func() {
99+
ctx, cancel := context.WithCancel(ctx)
100+
defer cancel()
101+
cs.Handler.Generic(ctx, evt, queue)
102+
}()
103+
}
104+
}
105+
}()
106+
107+
return nil
108+
}
109+
110+
func (cs *Channel[T]) doStop() {
111+
cs.destLock.Lock()
112+
defer cs.destLock.Unlock()
113+
114+
for _, dst := range cs.dest {
115+
close(dst)
116+
}
117+
}
118+
119+
func (cs *Channel[T]) distribute(evt event.TypedGenericEvent[T]) {
120+
cs.destLock.Lock()
121+
defer cs.destLock.Unlock()
122+
123+
for _, dst := range cs.dest {
124+
// We cannot make it under goroutine here, or we'll meet the
125+
// race condition of writing message to closed channels.
126+
// To avoid blocking, the dest channels are expected to be of
127+
// proper buffer size. If we still see it blocked, then
128+
// the controller is thought to be in an abnormal state.
129+
dst <- evt
130+
}
131+
}
132+
133+
func (cs *Channel[T]) syncLoop(ctx context.Context) {
134+
for {
135+
select {
136+
case <-ctx.Done():
137+
// Close destination channels
138+
cs.doStop()
139+
return
140+
case evt, stillOpen := <-cs.Source:
141+
if !stillOpen {
142+
// if the source channel is closed, we're never gonna get
143+
// anything more on it, so stop & bail
144+
cs.doStop()
145+
return
146+
}
147+
cs.distribute(evt)
148+
}
149+
}
150+
}

pkg/source/internal/func.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package internal
18+
19+
import (
20+
"context"
21+
"fmt"
22+
23+
"k8s.io/client-go/util/workqueue"
24+
)
25+
26+
// Func is a function that implements Source.
27+
type Func func(context.Context, workqueue.RateLimitingInterface) error
28+
29+
// Start implements Source.
30+
func (f Func) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error {
31+
return f(ctx, queue)
32+
}
33+
34+
func (f Func) String() string {
35+
return fmt.Sprintf("func source: %p", f)
36+
}

pkg/source/internal/informer.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package internal
18+
19+
import (
20+
"context"
21+
"errors"
22+
"fmt"
23+
24+
"k8s.io/client-go/util/workqueue"
25+
"sigs.k8s.io/controller-runtime/pkg/cache"
26+
"sigs.k8s.io/controller-runtime/pkg/handler"
27+
"sigs.k8s.io/controller-runtime/pkg/predicate"
28+
)
29+
30+
// Informer is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create).
31+
type Informer struct {
32+
// Informer is the controller-runtime Informer
33+
Informer cache.Informer
34+
Handler handler.EventHandler
35+
Predicates []predicate.Predicate
36+
}
37+
38+
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
39+
// to enqueue reconcile.Requests.
40+
func (is *Informer) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error {
41+
// Informer should have been specified by the user.
42+
if is.Informer == nil {
43+
return fmt.Errorf("must specify Informer.Informer")
44+
}
45+
if is.Handler == nil {
46+
return errors.New("must specify Informer.Handler")
47+
}
48+
49+
_, err := is.Informer.AddEventHandler(NewEventHandler(ctx, queue, is.Handler, is.Predicates).HandlerFuncs())
50+
if err != nil {
51+
return err
52+
}
53+
return nil
54+
}
55+
56+
func (is *Informer) String() string {
57+
return fmt.Sprintf("informer source: %p", is.Informer)
58+
}

pkg/internal/source/internal_test.go renamed to pkg/source/internal/internal_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
"sigs.k8s.io/controller-runtime/pkg/client"
2727
"sigs.k8s.io/controller-runtime/pkg/event"
2828
"sigs.k8s.io/controller-runtime/pkg/handler"
29-
internal "sigs.k8s.io/controller-runtime/pkg/internal/source"
29+
internal "sigs.k8s.io/controller-runtime/pkg/source/internal"
3030

3131
corev1 "k8s.io/api/core/v1"
3232
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

pkg/internal/source/kind.go renamed to pkg/source/internal/kind.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
117
package internal
218

319
import (

0 commit comments

Comments
 (0)