Skip to content

Client tests #27

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ import (

var log = logf.KBLog.WithName("object-cache")

// objectCache is a ReadInterface
var _ client.ReadInterface = &objectCache{}
// objectCache is a Reader
var _ client.Reader = &objectCache{}

// objectCache is a Kubernetes Object cache populated from Informers
type objectCache struct {
Expand All @@ -47,7 +47,7 @@ type objectCache struct {
informers *informers
}

var _ client.ReadInterface = &objectCache{}
var _ client.Reader = &objectCache{}

// addInformer adds an informer to the objectCache
func (o *objectCache) addInformer(gvk schema.GroupVersionKind, c cache.SharedIndexInformer) {
Expand Down Expand Up @@ -106,7 +106,7 @@ func (o *objectCache) cacheFor(obj runtime.Object) (*singleObjectCache, error) {
return cache, nil
}

// Get implements populatingClient.ReadInterface
// Get implements populatingClient.Reader
func (o *objectCache) Get(ctx context.Context, key client.ObjectKey, out runtime.Object) error {
// Make sure there is a Cache for this type
if !o.has(out) {
Expand All @@ -124,7 +124,7 @@ func (o *objectCache) Get(ctx context.Context, key client.ObjectKey, out runtime
return cache.Get(ctx, key, out)
}

// List implements populatingClient.ReadInterface
// List implements populatingClient.Reader
func (o *objectCache) List(ctx context.Context, opts *client.ListOptions, out runtime.Object) error {
itemsPtr, err := apimeta.GetItemsPtr(out)
if err != nil {
Expand All @@ -148,10 +148,10 @@ func (o *objectCache) List(ctx context.Context, opts *client.ListOptions, out ru
return cache.List(ctx, opts, out)
}

// singleObjectCache is a ReadInterface
var _ client.ReadInterface = &singleObjectCache{}
// singleObjectCache is a Reader
var _ client.Reader = &singleObjectCache{}

// singleObjectCache is a ReadInterface that retrieves objects
// singleObjectCache is a Reader that retrieves objects
// from a single local cache populated by a watch.
type singleObjectCache struct {
// Indexer is the underlying indexer wrapped by this cache.
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ var _ = Describe("Indexers", func() {
})

Describe("populatingClient interface wrapper around an indexer", func() {
var singleCache client.ReadInterface
var singleCache client.Reader

BeforeEach(func() {
var err error
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import (
"k8s.io/client-go/tools/cache"
)

// Cache implements ReadInterface by reading objects from a cache populated by Informers
// Cache implements Reader by reading objects from a cache populated by Informers
type Cache interface {
client.ReadInterface
client.Reader
Informers
IndexField(obj runtime.Object, field string, extractValue client.IndexerFunc) error
}
Expand Down
192 changes: 71 additions & 121 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,32 @@
/*
Copyright 2018 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package client

import (
"context"
"fmt"
"reflect"
"sync"

"github.com/kubernetes-sigs/controller-runtime/pkg/client/apiutil"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
serializer "k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"

"github.com/kubernetes-sigs/controller-runtime/pkg/client/apiutil"
"k8s.io/apimachinery/pkg/apis/meta/v1"
)

// Options are creation options for a Client
Expand All @@ -26,6 +40,10 @@ type Options struct {

// New returns a new Client using the provided config and Options.
func New(config *rest.Config, options Options) (Client, error) {
if config == nil {
return nil, fmt.Errorf("must provide non-nil rest.Config to client.New")
}

// Init a scheme if none provided
if options.Scheme == nil {
options.Scheme = scheme.Scheme
Expand All @@ -40,161 +58,93 @@ func New(config *rest.Config, options Options) (Client, error) {
}
}

c := &populatingClient{
config: config,
scheme: options.Scheme,
mapper: options.Mapper,
codecs: serializer.NewCodecFactory(options.Scheme),
paramCodec: runtime.NewParameterCodec(options.Scheme),
clientsByType: make(map[reflect.Type]rest.Interface),
resourcesByType: make(map[reflect.Type]string),
c := &client{
cache: clientCache{
config: config,
scheme: options.Scheme,
mapper: options.Mapper,
codecs: serializer.NewCodecFactory(options.Scheme),
resourceByType: make(map[reflect.Type]*resourceMeta),
},
paramCodec: runtime.NewParameterCodec(options.Scheme),
}

return c, nil
}

var _ Client = &populatingClient{}

// populatingClient is an Client that reads and writes directly from/to an API server. It lazily initialized
// new clients when they are used.
type populatingClient struct {
config *rest.Config
scheme *runtime.Scheme
mapper meta.RESTMapper

codecs serializer.CodecFactory
paramCodec runtime.ParameterCodec
clientsByType map[reflect.Type]rest.Interface
resourcesByType map[reflect.Type]string
mu sync.RWMutex
}

// makeClient maps obj to a Kubernetes Resource and constructs a populatingClient for that Resource.
func (c *populatingClient) makeClient(obj runtime.Object) (rest.Interface, string, error) {
gvk, err := apiutil.GVKForObject(obj, c.scheme)
if err != nil {
return nil, "", err
}
client, err := apiutil.RESTClientForGVK(gvk, c.config, c.codecs)
if err != nil {
return nil, "", err
}
mapping, err := c.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, "", err
}
return client, mapping.Resource, nil
}

// clientFor returns a raw rest.Client for the given object type.
func (c *populatingClient) clientFor(obj runtime.Object) (rest.Interface, string, error) {
typ := reflect.TypeOf(obj)

// It's better to do creation work twice than to not let multiple
// people make requests at once
c.mu.RLock()
client, known := c.clientsByType[typ]
resource, _ := c.resourcesByType[typ]
c.mu.RUnlock()

// Initialize a new Client
if !known {
var err error
client, resource, err = c.makeClient(obj)
if err != nil {
return nil, "", err
}
c.mu.Lock()
defer c.mu.Unlock()
c.clientsByType[typ] = client
c.resourcesByType[typ] = resource
}

return client, resource, nil
}
var _ Client = &client{}

func (c *populatingClient) metaAndClientFor(obj runtime.Object) (v1.Object, rest.Interface, string, error) {
client, resource, err := c.clientFor(obj)
if err != nil {
return nil, nil, "", err
}
meta, err := meta.Accessor(obj)
if err != nil {
return nil, nil, "", err
}
return meta, client, resource, err
// client is a client.Client that reads and writes directly from/to an API server. It lazily initializes
// new clients at the time they are used, and caches the client.
type client struct {
cache clientCache
paramCodec runtime.ParameterCodec
}

// Create implements Client
func (c *populatingClient) Create(ctx context.Context, obj runtime.Object) error {
meta, client, resource, err := c.metaAndClientFor(obj)
// Create implements client.Client
func (c *client) Create(ctx context.Context, obj runtime.Object) error {
o, err := c.cache.getObjMeta(obj)
if err != nil {
return err
}
return client.Post().
Namespace(meta.GetNamespace()).
Resource(resource).
return o.Post().
NamespaceIfScoped(o.GetNamespace(), o.isNamespaced()).
Resource(o.resource()).
Body(obj).
Do().
Into(obj)
}

// Update implements Client
func (c *populatingClient) Update(ctx context.Context, obj runtime.Object) error {
meta, client, resource, err := c.metaAndClientFor(obj)
// Update implements client.Client
func (c *client) Update(ctx context.Context, obj runtime.Object) error {
o, err := c.cache.getObjMeta(obj)
if err != nil {
return err
}
return client.Put().
Namespace(meta.GetNamespace()).
Resource(resource).
Name(meta.GetName()).
return o.Put().
NamespaceIfScoped(o.GetNamespace(), o.isNamespaced()).
Resource(o.resource()).
Name(o.GetName()).
Body(obj).
Do().
Into(obj)
}

// Delete implements Client
func (c *populatingClient) Delete(ctx context.Context, obj runtime.Object) error {
meta, client, resource, err := c.metaAndClientFor(obj)
// Delete implements client.Client
func (c *client) Delete(ctx context.Context, obj runtime.Object) error {
o, err := c.cache.getObjMeta(obj)
if err != nil {
return err
}
return client.Delete().
Namespace(meta.GetNamespace()).
Resource(resource).
Name(meta.GetName()).
return o.Delete().
NamespaceIfScoped(o.GetNamespace(), o.isNamespaced()).
Resource(o.resource()).
Name(o.GetName()).
Do().
Error()
}

// Get implements Client
func (c *populatingClient) Get(ctx context.Context, key ObjectKey, obj runtime.Object) error {
client, resource, err := c.clientFor(obj)
// Get implements client.Client
func (c *client) Get(ctx context.Context, key ObjectKey, obj runtime.Object) error {
r, err := c.cache.getResource(obj)
if err != nil {
return err
}
return client.Get().
Namespace(key.Namespace).
Resource(resource).
Name(key.Name).
Do().
Into(obj)
return r.Get().
NamespaceIfScoped(key.Namespace, r.isNamespaced()).
Resource(r.resource()).
Name(key.Name).Do().Into(obj)
}

// List implements Client
func (c *populatingClient) List(ctx context.Context, opts *ListOptions, obj runtime.Object) error {
client, resource, err := c.clientFor(obj)
// List implements client.Client
func (c *client) List(ctx context.Context, opts *ListOptions, obj runtime.Object) error {
r, err := c.cache.getResource(obj)
if err != nil {
return err
}
ns := ""
if opts != nil {
ns = opts.Namespace
}
return client.Get().
Namespace(ns).
Resource(resource).
return r.Get().
NamespaceIfScoped(opts.Namespace, r.isNamespaced()).
Resource(r.resource()).
Body(obj).
VersionedParams(opts.AsListOptions(), c.paramCodec).
Do().
Expand Down
Loading