Skip to content
This repository was archived by the owner on Apr 24, 2024. It is now read-only.

✨Wait until APIExport virtual workspace URLs are ready #33

Merged
merged 8 commits into from
Dec 2, 2022
120 changes: 81 additions & 39 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,35 @@ import (
"fmt"
"os"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/discovery"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
retrywatch "k8s.io/client-go/tools/watch"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/kcp"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

datav1alpha1 "github.com/kcp-dev/controller-runtime-example/api/v1alpha1"
apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1"
"github.com/kcp-dev/kcp/pkg/apis/third_party/conditions/util/conditions"

// +kubebuilder:scaffold:imports

datav1alpha1 "github.com/kcp-dev/controller-runtime-example/api/v1alpha1"
"github.com/kcp-dev/controller-runtime-example/controllers"

apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1"
)

var (
Expand All @@ -67,7 +72,7 @@ func main() {
var enableLeaderElection bool
var probeAddr string
var apiExportName string
flag.StringVar(&apiExportName, "api-export-name", "", "The name of the APIExport.")
flag.StringVar(&apiExportName, "api-export-name", "data.my.domain", "The name of the APIExport.")
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
Expand Down Expand Up @@ -161,48 +166,59 @@ func main() {
// +kubebuilder:rbac:groups="apis.kcp.dev",resources=apiexports,verbs=get;list;watch

// restConfigForAPIExport returns a *rest.Config properly configured to communicate with the endpoint for the
// APIExport's virtual workspace.
// APIExport's virtual workspace. It blocks until the controller APIExport VirtualWorkspaceURLsReady condition
// becomes truthy, which happens when the APIExport is bound for the first time.
func restConfigForAPIExport(ctx context.Context, cfg *rest.Config, apiExportName string) (*rest.Config, error) {
scheme := runtime.NewScheme()
if err := apisv1alpha1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("error adding apis.kcp.dev/v1alpha1 to scheme: %w", err)
}

apiExportClient, err := client.New(cfg, client.Options{Scheme: scheme})
apiExportClient, err := client.NewWithWatch(cfg, client.Options{Scheme: scheme})
if err != nil {
return nil, fmt.Errorf("error creating APIExport client: %w", err)
}

var apiExport apisv1alpha1.APIExport

if apiExportName != "" {
if err := apiExportClient.Get(ctx, types.NamespacedName{Name: apiExportName}, &apiExport); err != nil {
return nil, fmt.Errorf("error getting APIExport %q: %w", apiExportName, err)
}
} else {
setupLog.Info("api-export-name is empty - listing")
exports := &apisv1alpha1.APIExportList{}
if err := apiExportClient.List(ctx, exports); err != nil {
return nil, fmt.Errorf("error listing APIExports: %w", err)
}
if len(exports.Items) == 0 {
return nil, fmt.Errorf("no APIExport found")
}
if len(exports.Items) > 1 {
return nil, fmt.Errorf("more than one APIExport found")
}
apiExport = exports.Items[0]
list := &apisv1alpha1.APIExportList{}
selector := fields.OneTermEqualSelector("metadata.name", apiExportName)
err = apiExportClient.List(ctx, list, client.MatchingFieldsSelector{Selector: selector})
if err != nil {
return nil, fmt.Errorf("error watching for APIExport: %w", err)
}

if len(apiExport.Status.VirtualWorkspaces) < 1 {
return nil, fmt.Errorf("APIExport %q status.virtualWorkspaces is empty", apiExportName)
if len(list.Items) > 0 && isAPIExportReady(&list.Items[0]) {
cfg = rest.CopyConfig(cfg)
// TODO: sharding support
cfg.Host = list.Items[0].Status.VirtualWorkspaces[0].URL
return cfg, nil
}

cfg = rest.CopyConfig(cfg)
// TODO(ncdc): sharding support
cfg.Host = apiExport.Status.VirtualWorkspaces[0].URL
setupLog.Info("Watching for APIExport to become ready", "name", apiExportName)

return cfg, nil
rw, err := retrywatch.NewRetryWatcher(list.ResourceVersion, watcher(apiExportClient.Watch).FilteredBy(selector))
if err != nil {
return nil, fmt.Errorf("error creating retry watcher for APIExport: %w", err)
}
defer rw.Stop()

for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case e := <-rw.ResultChan():
switch e.Type {
case watch.Error:
return nil, fmt.Errorf("error watching for APIExport: %w", apierrors.FromObject(e.Object))

case watch.Added, watch.Modified:
apiExport, ok := e.Object.(*apisv1alpha1.APIExport)
if !ok {
return nil, fmt.Errorf("unexpected event object: %v", e.Object)
}
if !isAPIExportReady(apiExport) {
continue
}
cfg = rest.CopyConfig(cfg)
// TODO: sharding support
cfg.Host = apiExport.Status.VirtualWorkspaces[0].URL
return cfg, nil
}
}
}
}

func kcpAPIsGroupPresent(restConfig *rest.Config) bool {
Expand All @@ -228,3 +244,29 @@ func kcpAPIsGroupPresent(restConfig *rest.Config) bool {
}
return false
}

func isAPIExportReady(apiExport *apisv1alpha1.APIExport) bool {
if !conditions.IsTrue(apiExport, apisv1alpha1.APIExportVirtualWorkspaceURLsReady) {
setupLog.Info("APIExport virtual workspace URLs are not ready", "APIExport", apiExport.Name)
return false
}

if len(apiExport.Status.VirtualWorkspaces) == 0 {
setupLog.Info("APIExport does not have any virtual workspace URLs", "APIExport", apiExport.Name)
return false
}

return true
}

type watcher func(ctx context.Context, obj client.ObjectList, opts ...client.ListOption) (watch.Interface, error)

func (w watcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
return w(context.TODO(), &apisv1alpha1.APIExportList{}, &client.ListOptions{Raw: &options})
}

func (w watcher) FilteredBy(selector fields.Selector) watcher {
return func(ctx context.Context, obj client.ObjectList, opts ...client.ListOption) (watch.Interface, error) {
return w(ctx, obj, append(opts, client.MatchingFieldsSelector{Selector: selector})...)
}
}