Skip to content

Commit 442d3ca

Browse files
authored
Merge pull request #1460 from alvaroaleman/watch
✨ Add ClientWithWatch for use in CLIs
2 parents 5031262 + 45aa968 commit 442d3ca

File tree

4 files changed

+262
-0
lines changed

4 files changed

+262
-0
lines changed

pkg/client/client.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ type Options struct {
5353
// case of unstructured types, the group, version, and kind will be extracted
5454
// from the corresponding fields on the object.
5555
func New(config *rest.Config, options Options) (Client, error) {
56+
return newClient(config, options)
57+
}
58+
59+
func newClient(config *rest.Config, options Options) (*client, error) {
5660
if config == nil {
5761
return nil, fmt.Errorf("must provide non-nil rest.Config to client.New")
5862
}

pkg/client/interfaces.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"k8s.io/apimachinery/pkg/api/meta"
2525
"k8s.io/apimachinery/pkg/runtime"
2626
"k8s.io/apimachinery/pkg/types"
27+
"k8s.io/apimachinery/pkg/watch"
2728
)
2829

2930
// ObjectKey identifies a Kubernetes Object.
@@ -108,6 +109,14 @@ type Client interface {
108109
RESTMapper() meta.RESTMapper
109110
}
110111

112+
// WithWatch supports Watch on top of the CRUD operations supported by
113+
// the normal Client. Its intended use-case are CLI apps that need to wait for
114+
// events.
115+
type WithWatch interface {
116+
Client
117+
Watch(ctx context.Context, obj ObjectList, opts ...ListOption) (watch.Interface, error)
118+
}
119+
111120
// IndexerFunc knows how to take an object and turn it into a series
112121
// of non-namespaced keys. Namespaced objects are automatically given
113122
// namespaced and non-spaced variants, so keys do not need to include namespace.

pkg/client/watch.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package client
18+
19+
import (
20+
"context"
21+
"strings"
22+
23+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
25+
"k8s.io/apimachinery/pkg/watch"
26+
"k8s.io/client-go/dynamic"
27+
"k8s.io/client-go/rest"
28+
)
29+
30+
// NewWithWatch returns a new WithWatch.
31+
func NewWithWatch(config *rest.Config, options Options) (WithWatch, error) {
32+
client, err := newClient(config, options)
33+
if err != nil {
34+
return nil, err
35+
}
36+
dynamicClient, err := dynamic.NewForConfig(config)
37+
if err != nil {
38+
return nil, err
39+
}
40+
return &watchingClient{client: client, dynamic: dynamicClient}, nil
41+
}
42+
43+
type watchingClient struct {
44+
*client
45+
dynamic dynamic.Interface
46+
}
47+
48+
func (w *watchingClient) Watch(ctx context.Context, list ObjectList, opts ...ListOption) (watch.Interface, error) {
49+
switch l := list.(type) {
50+
case *unstructured.UnstructuredList:
51+
return w.unstructuredWatch(ctx, l, opts...)
52+
case *metav1.PartialObjectMetadataList:
53+
return w.metadataWatch(ctx, l, opts...)
54+
default:
55+
return w.typedWatch(ctx, l, opts...)
56+
}
57+
}
58+
59+
func (w *watchingClient) listOpts(opts ...ListOption) ListOptions {
60+
listOpts := ListOptions{}
61+
listOpts.ApplyOptions(opts)
62+
if listOpts.Raw == nil {
63+
listOpts.Raw = &metav1.ListOptions{}
64+
}
65+
listOpts.Raw.Watch = true
66+
67+
return listOpts
68+
}
69+
70+
func (w *watchingClient) metadataWatch(ctx context.Context, obj *metav1.PartialObjectMetadataList, opts ...ListOption) (watch.Interface, error) {
71+
gvk := obj.GroupVersionKind()
72+
if strings.HasSuffix(gvk.Kind, "List") {
73+
gvk.Kind = gvk.Kind[:len(gvk.Kind)-4]
74+
}
75+
76+
listOpts := w.listOpts(opts...)
77+
78+
resInt, err := w.client.metadataClient.getResourceInterface(gvk, listOpts.Namespace)
79+
if err != nil {
80+
return nil, err
81+
}
82+
83+
return resInt.Watch(ctx, *listOpts.AsListOptions())
84+
}
85+
86+
func (w *watchingClient) unstructuredWatch(ctx context.Context, obj *unstructured.UnstructuredList, opts ...ListOption) (watch.Interface, error) {
87+
gvk := obj.GroupVersionKind()
88+
if strings.HasSuffix(gvk.Kind, "List") {
89+
gvk.Kind = gvk.Kind[:len(gvk.Kind)-4]
90+
}
91+
92+
r, err := w.client.unstructuredClient.cache.getResource(obj)
93+
if err != nil {
94+
return nil, err
95+
}
96+
97+
listOpts := w.listOpts(opts...)
98+
99+
if listOpts.Namespace != "" && r.isNamespaced() {
100+
return w.dynamic.Resource(r.mapping.Resource).Namespace(listOpts.Namespace).Watch(ctx, *listOpts.AsListOptions())
101+
}
102+
return w.dynamic.Resource(r.mapping.Resource).Watch(ctx, *listOpts.AsListOptions())
103+
}
104+
105+
func (w *watchingClient) typedWatch(ctx context.Context, obj ObjectList, opts ...ListOption) (watch.Interface, error) {
106+
r, err := w.client.typedClient.cache.getResource(obj)
107+
if err != nil {
108+
return nil, err
109+
}
110+
111+
listOpts := w.listOpts(opts...)
112+
113+
return r.Get().
114+
NamespaceIfScoped(listOpts.Namespace, r.isNamespaced()).
115+
Resource(r.resource()).
116+
VersionedParams(listOpts.AsListOptions(), w.client.typedClient.paramCodec).
117+
Watch(ctx)
118+
}

pkg/client/watch_test.go

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package client_test
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"sync/atomic"
23+
24+
. "github.com/onsi/ginkgo"
25+
. "github.com/onsi/gomega"
26+
appsv1 "k8s.io/api/apps/v1"
27+
corev1 "k8s.io/api/core/v1"
28+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
30+
"k8s.io/apimachinery/pkg/fields"
31+
"k8s.io/apimachinery/pkg/runtime/schema"
32+
"k8s.io/apimachinery/pkg/watch"
33+
"sigs.k8s.io/controller-runtime/pkg/client"
34+
)
35+
36+
var _ = Describe("ClientWithWatch", func() {
37+
var dep *appsv1.Deployment
38+
var count uint64 = 0
39+
var replicaCount int32 = 2
40+
var ns = "kube-public"
41+
ctx := context.TODO()
42+
43+
BeforeEach(func(done Done) {
44+
atomic.AddUint64(&count, 1)
45+
dep = &appsv1.Deployment{
46+
ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("watch-deployment-name-%v", count), Namespace: ns, Labels: map[string]string{"app": fmt.Sprintf("bar-%v", count)}},
47+
Spec: appsv1.DeploymentSpec{
48+
Replicas: &replicaCount,
49+
Selector: &metav1.LabelSelector{
50+
MatchLabels: map[string]string{"foo": "bar"},
51+
},
52+
Template: corev1.PodTemplateSpec{
53+
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}},
54+
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "nginx", Image: "nginx"}}},
55+
},
56+
},
57+
}
58+
59+
var err error
60+
dep, err = clientset.AppsV1().Deployments(ns).Create(ctx, dep, metav1.CreateOptions{})
61+
Expect(err).NotTo(HaveOccurred())
62+
close(done)
63+
}, serverSideTimeoutSeconds)
64+
65+
AfterEach(func(done Done) {
66+
deleteDeployment(ctx, dep, ns)
67+
close(done)
68+
}, serverSideTimeoutSeconds)
69+
70+
Describe("NewWithWatch", func() {
71+
It("should return a new Client", func(done Done) {
72+
cl, err := client.NewWithWatch(cfg, client.Options{})
73+
Expect(err).NotTo(HaveOccurred())
74+
Expect(cl).NotTo(BeNil())
75+
76+
close(done)
77+
})
78+
79+
watchSuite := func(through client.ObjectList, expectedType client.Object) {
80+
cl, err := client.NewWithWatch(cfg, client.Options{})
81+
Expect(err).NotTo(HaveOccurred())
82+
Expect(cl).NotTo(BeNil())
83+
84+
watchInterface, err := cl.Watch(ctx, through, &client.ListOptions{
85+
FieldSelector: fields.OneTermEqualSelector("metadata.name", dep.Name),
86+
Namespace: dep.Namespace,
87+
})
88+
Expect(err).NotTo(HaveOccurred())
89+
Expect(watchInterface).NotTo(BeNil())
90+
91+
defer watchInterface.Stop()
92+
93+
event, ok := <-watchInterface.ResultChan()
94+
Expect(ok).To(BeTrue())
95+
Expect(event.Type).To(BeIdenticalTo(watch.Added))
96+
Expect(event.Object).To(BeAssignableToTypeOf(expectedType))
97+
98+
// The metadata client doesn't set GVK so we just use the
99+
// name and UID as a proxy to confirm that we got the right
100+
// object.
101+
metaObject, ok := event.Object.(metav1.Object)
102+
Expect(ok).To(BeTrue())
103+
Expect(metaObject.GetName()).To(Equal(dep.Name))
104+
Expect(metaObject.GetUID()).To(Equal(dep.UID))
105+
106+
}
107+
108+
It("should receive a create event when watching the typed object", func(done Done) {
109+
watchSuite(&appsv1.DeploymentList{}, &appsv1.Deployment{})
110+
close(done)
111+
}, 15)
112+
113+
It("should receive a create event when watching the unstructured object", func(done Done) {
114+
u := &unstructured.UnstructuredList{}
115+
u.SetGroupVersionKind(schema.GroupVersionKind{
116+
Group: "apps",
117+
Kind: "Deployment",
118+
Version: "v1",
119+
})
120+
watchSuite(u, &unstructured.Unstructured{})
121+
close(done)
122+
}, 15)
123+
124+
It("should receive a create event when watching the metadata object", func(done Done) {
125+
m := &metav1.PartialObjectMetadataList{TypeMeta: metav1.TypeMeta{Kind: "Deployment", APIVersion: "apps/v1"}}
126+
watchSuite(m, &metav1.PartialObjectMetadata{})
127+
close(done)
128+
}, 15)
129+
})
130+
131+
})

0 commit comments

Comments
 (0)