Skip to content
This repository was archived by the owner on Apr 24, 2024. It is now read-only.

✨Wait until APIExport virtual workspace URLs are ready #33

Merged
merged 8 commits into from
Dec 2, 2022
90 changes: 52 additions & 38 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,31 @@ import (
"fmt"
"os"

"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/discovery"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/kcp"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

datav1alpha1 "github.com/kcp-dev/controller-runtime-example/api/v1alpha1"
apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1"
"github.com/kcp-dev/kcp/pkg/apis/third_party/conditions/util/conditions"

// +kubebuilder:scaffold:imports

datav1alpha1 "github.com/kcp-dev/controller-runtime-example/api/v1alpha1"
"github.com/kcp-dev/controller-runtime-example/controllers"

apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1"
)

var (
Expand All @@ -67,7 +68,7 @@ func main() {
var enableLeaderElection bool
var probeAddr string
var apiExportName string
flag.StringVar(&apiExportName, "api-export-name", "", "The name of the APIExport.")
flag.StringVar(&apiExportName, "api-export-name", "data.my.domain", "The name of the APIExport.")
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
Expand Down Expand Up @@ -161,48 +162,61 @@ func main() {
// +kubebuilder:rbac:groups="apis.kcp.dev",resources=apiexports,verbs=get;list;watch

// restConfigForAPIExport returns a *rest.Config properly configured to communicate with the endpoint for the
// APIExport's virtual workspace.
// APIExport's virtual workspace. It blocks until the controller APIExport VirtualWorkspaceURLsReady condition
// becomes truthy, which happens when the APIExport is bound for the first time.
func restConfigForAPIExport(ctx context.Context, cfg *rest.Config, apiExportName string) (*rest.Config, error) {
scheme := runtime.NewScheme()
if err := apisv1alpha1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("error adding apis.kcp.dev/v1alpha1 to scheme: %w", err)
apiExportClient, err := client.NewWithWatch(cfg, client.Options{Scheme: scheme})
if err != nil {
return nil, fmt.Errorf("error creating APIExport client: %w", err)
}

apiExportClient, err := client.New(cfg, client.Options{Scheme: scheme})
selector := client.MatchingFieldsSelector{
Selector: fields.OneTermEqualSelector("metadata.name", apiExportName),
}
watch, err := apiExportClient.Watch(ctx, &apisv1alpha1.APIExportList{}, selector)
if err != nil {
return nil, fmt.Errorf("error creating APIExport client: %w", err)
return nil, fmt.Errorf("error watching for APIExport: %w", err)
}

var apiExport apisv1alpha1.APIExport
for {
select {
case <-ctx.Done():
watch.Stop()
return nil, ctx.Err()
case e, ok := <-watch.ResultChan():
if !ok {
// The channel has been closed. Let's retry watching in case it timed out on idle,
// or fail in case connection to the server cannot be re-established.
watch, err = apiExportClient.Watch(ctx, &apisv1alpha1.APIExportList{}, selector)
if err != nil {
return nil, fmt.Errorf("error watching for APIExport: %w", err)
}
}

if apiExportName != "" {
if err := apiExportClient.Get(ctx, types.NamespacedName{Name: apiExportName}, &apiExport); err != nil {
return nil, fmt.Errorf("error getting APIExport %q: %w", apiExportName, err)
}
} else {
setupLog.Info("api-export-name is empty - listing")
exports := &apisv1alpha1.APIExportList{}
if err := apiExportClient.List(ctx, exports); err != nil {
return nil, fmt.Errorf("error listing APIExports: %w", err)
}
if len(exports.Items) == 0 {
return nil, fmt.Errorf("no APIExport found")
}
if len(exports.Items) > 1 {
return nil, fmt.Errorf("more than one APIExport found")
}
apiExport = exports.Items[0]
}
apiExport, ok := e.Object.(*apisv1alpha1.APIExport)
if !ok {
continue
}

if len(apiExport.Status.VirtualWorkspaces) < 1 {
return nil, fmt.Errorf("APIExport %q status.virtualWorkspaces is empty", apiExportName)
}
setupLog.Info("APIExport event received", "name", apiExport.Name, "event", e.Type)

if !conditions.IsTrue(apiExport, apisv1alpha1.APIExportVirtualWorkspaceURLsReady) {
setupLog.Info("APIExport virtual workspace URLs are not ready", "APIExport", apiExport.Name)
continue
}

cfg = rest.CopyConfig(cfg)
// TODO(ncdc): sharding support
cfg.Host = apiExport.Status.VirtualWorkspaces[0].URL
if len(apiExport.Status.VirtualWorkspaces) == 0 {
setupLog.Info("APIExport does not have any virtual workspace URLs", "APIExport", apiExport.Name)
continue
}

return cfg, nil
setupLog.Info("Using APIExport to configure client", "APIExport", apiExport.Name)
cfg = rest.CopyConfig(cfg)
// TODO(ncdc): sharding support
cfg.Host = apiExport.Status.VirtualWorkspaces[0].URL
return cfg, nil
}
}
}

func kcpAPIsGroupPresent(restConfig *rest.Config) bool {
Expand Down