Skip to content
This repository was archived by the owner on Apr 24, 2024. It is now read-only.

Commit 1899c14

Browse files
committed
Use client-go retry watcher to watch for APIExport
1 parent 51a87fc commit 1899c14

File tree

1 file changed

+63
-35
lines changed

1 file changed

+63
-35
lines changed

main.go

Lines changed: 63 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,16 @@ import (
2222
"fmt"
2323
"os"
2424

25+
apierrors "k8s.io/apimachinery/pkg/api/errors"
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2527
"k8s.io/apimachinery/pkg/fields"
2628
"k8s.io/apimachinery/pkg/runtime"
2729
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
30+
"k8s.io/apimachinery/pkg/watch"
2831
"k8s.io/client-go/discovery"
2932
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
3033
"k8s.io/client-go/rest"
34+
retrywatch "k8s.io/client-go/tools/watch"
3135
"k8s.io/klog/v2"
3236

3337
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
@@ -170,51 +174,49 @@ func restConfigForAPIExport(ctx context.Context, cfg *rest.Config, apiExportName
170174
return nil, fmt.Errorf("error creating APIExport client: %w", err)
171175
}
172176

173-
selector := client.MatchingFieldsSelector{
174-
Selector: fields.OneTermEqualSelector("metadata.name", apiExportName),
175-
}
176-
watch, err := apiExportClient.Watch(ctx, &apisv1alpha1.APIExportList{}, selector)
177+
list := &apisv1alpha1.APIExportList{}
178+
selector := fields.OneTermEqualSelector("metadata.name", apiExportName)
179+
err = apiExportClient.List(ctx, list, client.MatchingFieldsSelector{Selector: selector})
177180
if err != nil {
178181
return nil, fmt.Errorf("error watching for APIExport: %w", err)
179182
}
183+
if len(list.Items) > 0 && isAPIExportReady(&list.Items[0]) {
184+
cfg = rest.CopyConfig(cfg)
185+
// TODO: sharding support
186+
cfg.Host = list.Items[0].Status.VirtualWorkspaces[0].URL
187+
return cfg, nil
188+
}
189+
190+
setupLog.Info("Watching for APIExport to become ready", "name", apiExportName)
191+
192+
rw, err := retrywatch.NewRetryWatcher(list.ResourceVersion, watcher(apiExportClient.Watch).FilteredBy(selector))
193+
if err != nil {
194+
return nil, fmt.Errorf("error creating retry watcher for APIExport: %w", err)
195+
}
196+
defer rw.Stop()
180197

181198
for {
182199
select {
183200
case <-ctx.Done():
184-
watch.Stop()
185201
return nil, ctx.Err()
186-
case e, ok := <-watch.ResultChan():
187-
if !ok {
188-
// The channel has been closed. Let's retry watching in case it timed out on idle,
189-
// or fail in case connection to the server cannot be re-established.
190-
watch, err = apiExportClient.Watch(ctx, &apisv1alpha1.APIExportList{}, selector)
191-
if err != nil {
192-
return nil, fmt.Errorf("error watching for APIExport: %w", err)
202+
case e := <-rw.ResultChan():
203+
switch e.Type {
204+
case watch.Error:
205+
return nil, fmt.Errorf("error watching for APIExport: %w", apierrors.FromObject(e.Object))
206+
207+
case watch.Added, watch.Modified:
208+
apiExport, ok := e.Object.(*apisv1alpha1.APIExport)
209+
if !ok {
210+
return nil, fmt.Errorf("unexpected event object: %v", e.Object)
193211
}
212+
if !isAPIExportReady(apiExport) {
213+
continue
214+
}
215+
cfg = rest.CopyConfig(cfg)
216+
// TODO: sharding support
217+
cfg.Host = apiExport.Status.VirtualWorkspaces[0].URL
218+
return cfg, nil
194219
}
195-
196-
apiExport, ok := e.Object.(*apisv1alpha1.APIExport)
197-
if !ok {
198-
continue
199-
}
200-
201-
setupLog.Info("APIExport event received", "name", apiExport.Name, "event", e.Type)
202-
203-
if !conditions.IsTrue(apiExport, apisv1alpha1.APIExportVirtualWorkspaceURLsReady) {
204-
setupLog.Info("APIExport virtual workspace URLs are not ready", "APIExport", apiExport.Name)
205-
continue
206-
}
207-
208-
if len(apiExport.Status.VirtualWorkspaces) == 0 {
209-
setupLog.Info("APIExport does not have any virtual workspace URLs", "APIExport", apiExport.Name)
210-
continue
211-
}
212-
213-
setupLog.Info("Using APIExport to configure client", "APIExport", apiExport.Name)
214-
cfg = rest.CopyConfig(cfg)
215-
// TODO(ncdc): sharding support
216-
cfg.Host = apiExport.Status.VirtualWorkspaces[0].URL
217-
return cfg, nil
218220
}
219221
}
220222
}
@@ -242,3 +244,29 @@ func kcpAPIsGroupPresent(restConfig *rest.Config) bool {
242244
}
243245
return false
244246
}
247+
248+
func isAPIExportReady(apiExport *apisv1alpha1.APIExport) bool {
249+
if !conditions.IsTrue(apiExport, apisv1alpha1.APIExportVirtualWorkspaceURLsReady) {
250+
setupLog.Info("APIExport virtual workspace URLs are not ready", "APIExport", apiExport.Name)
251+
return false
252+
}
253+
254+
if len(apiExport.Status.VirtualWorkspaces) == 0 {
255+
setupLog.Info("APIExport does not have any virtual workspace URLs", "APIExport", apiExport.Name)
256+
return false
257+
}
258+
259+
return true
260+
}
261+
262+
type watcher func(ctx context.Context, obj client.ObjectList, opts ...client.ListOption) (watch.Interface, error)
263+
264+
func (w watcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
265+
return w(context.TODO(), &apisv1alpha1.APIExportList{}, &client.ListOptions{Raw: &options})
266+
}
267+
268+
func (w watcher) FilteredBy(selector fields.Selector) watcher {
269+
return func(ctx context.Context, obj client.ObjectList, opts ...client.ListOption) (watch.Interface, error) {
270+
return w(ctx, obj, append(opts, client.MatchingFieldsSelector{Selector: selector})...)
271+
}
272+
}

0 commit comments

Comments
 (0)