Skip to content

Commit 3af79ea

Browse files
Shawn HurleyEric Stroczynski
authored andcommitted
pkg/{ansible,helm}/controller: cached default manager client (#1047)
* pkg/{ansible,helm}/controller: change the default manager client. The new default client for Ansible and Helm with use the cache for the reader interface. * pkg/ansible/controller: read directly from API server in status updaters * CHANGELOG.md: add Ansible/Helm cached reader change
1 parent efdb79f commit 3af79ea

File tree

7 files changed

+98
-64
lines changed

7 files changed

+98
-64
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
- Updated CRD generation for non-Go operators to use valid structural schema. ([#2275](https://github.com/operator-framework/operator-sdk/issues/2275))
4141
- Replace Role verb `"*"` with list of verb strings in generated files so the Role is compatible with OpenShift and Kubernetes. ([#2175](https://github.com/operator-framework/operator-sdk/pull/2175))
4242
- **Breaking change:** An existing CSV's `spec.customresourcedefinitions.owned` is now always overwritten except for each `name`, `version`, and `kind` on invoking [`olm-catalog gen-csv`](https://github.com/operator-framework/operator-sdk/blob/d147bb3/doc/cli/operator-sdk_olm-catalog_gen-csv.md) when Go API code [annotations](https://github.com/operator-framework/operator-sdk/blob/d147bb3/doc/user/olm-catalog/csv-annotations.md) are present. ([#1162](https://github.com/operator-framework/operator-sdk/pull/1162))
43+
- Ansible and Helm operator reconcilers use a cached client for reads instead of the default unstructured client. ([#1047](https://github.com/operator-framework/operator-sdk/pull/1047))
4344

4445
### Deprecated
4546

hack/tests/e2e-ansible.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,8 @@ load_image_if_kind "$METRICS_TEST_IMAGE"
172172

173173
OPERATORDIR="$(pwd)"
174174

175-
deploy_operator
176175
trap_add 'remove_operator' EXIT
176+
deploy_operator
177177
test_operator
178178
remove_operator
179179

pkg/ansible/controller/controller.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ func Add(mgr manager.Manager, options Options) *controller.Controller {
6565
EventHandlers: eventHandlers,
6666
ReconcilePeriod: options.ReconcilePeriod,
6767
ManageStatus: options.ManageStatus,
68+
APIReader: mgr.GetAPIReader(),
6869
}
6970

7071
scheme := mgr.GetScheme()

pkg/ansible/controller/reconcile.go

Lines changed: 58 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ type AnsibleOperatorReconciler struct {
5555
GVK schema.GroupVersionKind
5656
Runner runner.Runner
5757
Client client.Client
58+
APIReader client.Reader
5859
EventHandlers []events.EventHandler
5960
ReconcilePeriod time.Duration
6061
ManageStatus bool
@@ -84,7 +85,8 @@ func (r *AnsibleOperatorReconciler) Reconcile(request reconcile.Request) (reconc
8485
duration, err := time.ParseDuration(ds)
8586
if err != nil {
8687
// Should attempt to update to a failed condition
87-
if errmark := r.markError(u, request.NamespacedName, fmt.Sprintf("Unable to parse reconcile period annotation: %v", err)); errmark != nil {
88+
errmark := r.markError(u, request.NamespacedName, fmt.Sprintf("Unable to parse reconcile period annotation: %v", err))
89+
if errmark != nil {
8890
logger.Error(errmark, "Unable to mark error annotation")
8991
}
9092
logger.Error(err, "Unable to parse reconcile period annotation")
@@ -124,10 +126,10 @@ func (r *AnsibleOperatorReconciler) Reconcile(request reconcile.Request) (reconc
124126
}
125127

126128
if r.ManageStatus {
127-
err = r.markRunning(u, request.NamespacedName)
128-
if err != nil {
129-
logger.Error(err, "Unable to update the status to mark cr as running")
130-
return reconcileResult, err
129+
errmark := r.markRunning(u, request.NamespacedName)
130+
if errmark != nil {
131+
logger.Error(errmark, "Unable to update the status to mark cr as running")
132+
return reconcileResult, errmark
131133
}
132134
}
133135

@@ -140,7 +142,8 @@ func (r *AnsibleOperatorReconciler) Reconcile(request reconcile.Request) (reconc
140142

141143
kc, err := kubeconfig.Create(ownerRef, "http://localhost:8888", u.GetNamespace())
142144
if err != nil {
143-
if errmark := r.markError(u, request.NamespacedName, "Unable to run reconciliation"); errmark != nil {
145+
errmark := r.markError(u, request.NamespacedName, "Unable to run reconciliation")
146+
if errmark != nil {
144147
logger.Error(errmark, "Unable to mark error to run reconciliation")
145148
}
146149
logger.Error(err, "Unable to generate kubeconfig")
@@ -153,7 +156,8 @@ func (r *AnsibleOperatorReconciler) Reconcile(request reconcile.Request) (reconc
153156
}()
154157
result, err := r.Runner.Run(ident, u, kc.Name())
155158
if err != nil {
156-
if errmark := r.markError(u, request.NamespacedName, "Unable to run reconciliation"); errmark != nil {
159+
errmark := r.markError(u, request.NamespacedName, "Unable to run reconciliation")
160+
if errmark != nil {
157161
logger.Error(errmark, "Unable to mark error to run reconciliation")
158162
}
159163
logger.Error(err, "Unable to run ansible runner")
@@ -193,13 +197,13 @@ func (r *AnsibleOperatorReconciler) Reconcile(request reconcile.Request) (reconc
193197
return reconcileResult, eventErr
194198
}
195199

196-
// Need to get the unstructured object after ansible
197-
// this needs to hit the API
198-
err = r.Client.Get(context.TODO(), request.NamespacedName, u)
199-
if apierrors.IsNotFound(err) {
200-
return reconcile.Result{}, nil
201-
}
200+
// Need to get the unstructured object after the Ansible runner finishes.
201+
// This needs to hit the API server to retrieve updates.
202+
err = r.APIReader.Get(context.TODO(), request.NamespacedName, u)
202203
if err != nil {
204+
if apierrors.IsNotFound(err) {
205+
return reconcile.Result{}, nil
206+
}
203207
return reconcile.Result{}, err
204208
}
205209

@@ -225,27 +229,24 @@ func (r *AnsibleOperatorReconciler) Reconcile(request reconcile.Request) (reconc
225229
}
226230
}
227231
if r.ManageStatus {
228-
err = r.markDone(u, request.NamespacedName, statusEvent, failureMessages)
229-
if err != nil {
230-
logger.Error(err, "Failed to mark status done")
232+
errmark := r.markDone(u, request.NamespacedName, statusEvent, failureMessages)
233+
if errmark != nil {
234+
logger.Error(errmark, "Failed to mark status done")
231235
}
236+
return reconcileResult, errmark
232237
}
233-
return reconcileResult, err
238+
return reconcileResult, nil
234239
}
235240

236241
func (r *AnsibleOperatorReconciler) markRunning(u *unstructured.Unstructured, namespacedName types.NamespacedName) error {
237-
// Get the latest resource to prevent updating a stale status
238-
err := r.Client.Get(context.TODO(), namespacedName, u)
239-
if err != nil {
242+
// Get the latest resource to prevent updating a stale status.
243+
if err := r.APIReader.Get(context.TODO(), namespacedName, u); err != nil {
240244
return err
241245
}
242-
statusInterface := u.Object["status"]
243-
statusMap, _ := statusInterface.(map[string]interface{})
244-
crStatus := ansiblestatus.CreateFromMap(statusMap)
246+
crStatus := getStatus(u)
245247

246248
// If there is no current status add that we are working on this resource.
247249
errCond := ansiblestatus.GetCondition(crStatus, ansiblestatus.FailureConditionType)
248-
249250
if errCond != nil {
250251
errCond.Status = v1.ConditionFalse
251252
ansiblestatus.SetCondition(&crStatus, *errCond)
@@ -262,34 +263,26 @@ func (r *AnsibleOperatorReconciler) markRunning(u *unstructured.Unstructured, na
262263
)
263264
ansiblestatus.SetCondition(&crStatus, *c)
264265
u.Object["status"] = crStatus.GetJSONMap()
265-
err = r.Client.Status().Update(context.TODO(), u)
266-
if err != nil {
267-
return err
268-
}
269-
return nil
266+
267+
return r.Client.Status().Update(context.TODO(), u)
270268
}
271269

272270
// markError - used to alert the user to the issues during the validation of a reconcile run.
273271
// i.e Annotations that could be incorrect
274272
func (r *AnsibleOperatorReconciler) markError(u *unstructured.Unstructured, namespacedName types.NamespacedName, failureMessage string) error {
275273
logger := logf.Log.WithName("markError")
274+
// Immediately update metrics with failed reconciliation, since Get()
275+
// may fail.
276276
metrics.ReconcileFailed(r.GVK.String())
277-
// Get the latest resource to prevent updating a stale status
278-
err := r.Client.Get(context.TODO(), namespacedName, u)
279-
if apierrors.IsNotFound(err) {
280-
logger.Info("Resource not found, assuming it was deleted")
281-
return nil
282-
}
283-
if err != nil {
277+
// Get the latest resource to prevent updating a stale status.
278+
if err := r.APIReader.Get(context.TODO(), namespacedName, u); err != nil {
279+
if apierrors.IsNotFound(err) {
280+
logger.Info("Resource not found, assuming it was deleted")
281+
return nil
282+
}
284283
return err
285284
}
286-
statusInterface := u.Object["status"]
287-
statusMap, ok := statusInterface.(map[string]interface{})
288-
// If the map is not available create one.
289-
if !ok {
290-
statusMap = map[string]interface{}{}
291-
}
292-
crStatus := ansiblestatus.CreateFromMap(statusMap)
285+
crStatus := getStatus(u)
293286

294287
sc := ansiblestatus.GetCondition(crStatus, ansiblestatus.RunningConditionType)
295288
if sc != nil {
@@ -313,27 +306,26 @@ func (r *AnsibleOperatorReconciler) markError(u *unstructured.Unstructured, name
313306

314307
func (r *AnsibleOperatorReconciler) markDone(u *unstructured.Unstructured, namespacedName types.NamespacedName, statusEvent eventapi.StatusJobEvent, failureMessages eventapi.FailureMessages) error {
315308
logger := logf.Log.WithName("markDone")
316-
// Get the latest resource to prevent updating a stale status
317-
err := r.Client.Get(context.TODO(), namespacedName, u)
318-
if apierrors.IsNotFound(err) {
319-
logger.Info("Resource not found, assuming it was deleted")
320-
return nil
321-
}
322-
if err != nil {
309+
// Get the latest resource to prevent updating a stale status.
310+
if err := r.APIReader.Get(context.TODO(), namespacedName, u); err != nil {
311+
if apierrors.IsNotFound(err) {
312+
logger.Info("Resource not found, assuming it was deleted")
313+
return nil
314+
}
323315
return err
324316
}
325-
statusInterface := u.Object["status"]
326-
statusMap, _ := statusInterface.(map[string]interface{})
327-
crStatus := ansiblestatus.CreateFromMap(statusMap)
317+
crStatus := getStatus(u)
328318

329319
runSuccessful := len(failureMessages) == 0
330320
ansibleStatus := ansiblestatus.NewAnsibleResultFromStatusJobEvent(statusEvent)
331321

332322
if !runSuccessful {
333323
metrics.ReconcileFailed(r.GVK.String())
334324
sc := ansiblestatus.GetCondition(crStatus, ansiblestatus.RunningConditionType)
335-
sc.Status = v1.ConditionFalse
336-
ansiblestatus.SetCondition(&crStatus, *sc)
325+
if sc != nil {
326+
sc.Status = v1.ConditionFalse
327+
ansiblestatus.SetCondition(&crStatus, *sc)
328+
}
337329
c := ansiblestatus.NewCondition(
338330
ansiblestatus.FailureConditionType,
339331
v1.ConditionTrue,
@@ -369,3 +361,14 @@ func contains(l []string, s string) bool {
369361
}
370362
return false
371363
}
364+
365+
// getStatus returns u's "status" block as a status.Status.
366+
func getStatus(u *unstructured.Unstructured) ansiblestatus.Status {
367+
statusInterface := u.Object["status"]
368+
statusMap, ok := statusInterface.(map[string]interface{})
369+
// If the map is not available create one.
370+
if !ok {
371+
statusMap = map[string]interface{}{}
372+
}
373+
return ansiblestatus.CreateFromMap(statusMap)
374+
}

pkg/ansible/controller/reconcile_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,7 @@ func TestReconcile(t *testing.T) {
487487
GVK: tc.GVK,
488488
Runner: tc.Runner,
489489
Client: tc.Client,
490+
APIReader: tc.Client,
490491
EventHandlers: tc.EventHandlers,
491492
ReconcilePeriod: tc.ReconcilePeriod,
492493
ManageStatus: tc.ManageStatus,

pkg/ansible/run.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,25 +21,28 @@ import (
2121
"runtime"
2222

2323
"github.com/operator-framework/operator-sdk/pkg/ansible/controller"
24+
aoflags "github.com/operator-framework/operator-sdk/pkg/ansible/flags"
25+
proxy "github.com/operator-framework/operator-sdk/pkg/ansible/proxy"
2426
"github.com/operator-framework/operator-sdk/pkg/ansible/proxy/controllermap"
2527
"github.com/operator-framework/operator-sdk/pkg/ansible/runner"
2628
"github.com/operator-framework/operator-sdk/pkg/ansible/watches"
2729
"github.com/operator-framework/operator-sdk/pkg/k8sutil"
30+
kubemetrics "github.com/operator-framework/operator-sdk/pkg/kube-metrics"
2831
"github.com/operator-framework/operator-sdk/pkg/leader"
2932
"github.com/operator-framework/operator-sdk/pkg/metrics"
33+
sdkVersion "github.com/operator-framework/operator-sdk/version"
34+
35+
v1 "k8s.io/api/core/v1"
36+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3037
"k8s.io/apimachinery/pkg/runtime/schema"
3138
"k8s.io/apimachinery/pkg/util/intstr"
39+
"k8s.io/client-go/rest"
40+
"sigs.k8s.io/controller-runtime/pkg/cache"
41+
"sigs.k8s.io/controller-runtime/pkg/client"
3242
"sigs.k8s.io/controller-runtime/pkg/client/config"
43+
logf "sigs.k8s.io/controller-runtime/pkg/log"
3344
"sigs.k8s.io/controller-runtime/pkg/manager"
3445
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
35-
36-
aoflags "github.com/operator-framework/operator-sdk/pkg/ansible/flags"
37-
proxy "github.com/operator-framework/operator-sdk/pkg/ansible/proxy"
38-
kubemetrics "github.com/operator-framework/operator-sdk/pkg/kube-metrics"
39-
sdkVersion "github.com/operator-framework/operator-sdk/version"
40-
v1 "k8s.io/api/core/v1"
41-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
42-
logf "sigs.k8s.io/controller-runtime/pkg/log"
4346
)
4447

4548
var (
@@ -79,6 +82,17 @@ func Run(flags *aoflags.AnsibleOperatorFlags) error {
7982
mgr, err := manager.New(cfg, manager.Options{
8083
Namespace: namespace,
8184
MetricsBindAddress: fmt.Sprintf("%s:%d", metricsHost, metricsPort),
85+
NewClient: func(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) {
86+
c, err := client.New(config, options)
87+
if err != nil {
88+
return nil, err
89+
}
90+
return &client.DelegatingClient{
91+
Reader: cache,
92+
Writer: c,
93+
StatusClient: c,
94+
}, nil
95+
},
8296
})
8397
if err != nil {
8498
log.Error(err, "Failed to create a new manager.")

pkg/helm/run.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ import (
3434
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3535
"k8s.io/apimachinery/pkg/runtime/schema"
3636
"k8s.io/apimachinery/pkg/util/intstr"
37+
"k8s.io/client-go/rest"
38+
"sigs.k8s.io/controller-runtime/pkg/cache"
39+
crclient "sigs.k8s.io/controller-runtime/pkg/client"
3740
"sigs.k8s.io/controller-runtime/pkg/client/config"
3841
logf "sigs.k8s.io/controller-runtime/pkg/log"
3942
"sigs.k8s.io/controller-runtime/pkg/manager"
@@ -80,6 +83,17 @@ func Run(flags *hoflags.HelmOperatorFlags) error {
8083
mgr, err := manager.New(cfg, manager.Options{
8184
Namespace: namespace,
8285
MetricsBindAddress: fmt.Sprintf("%s:%d", metricsHost, metricsPort),
86+
NewClient: func(cache cache.Cache, config *rest.Config, options crclient.Options) (crclient.Client, error) {
87+
c, err := crclient.New(config, options)
88+
if err != nil {
89+
return nil, err
90+
}
91+
return &crclient.DelegatingClient{
92+
Reader: cache,
93+
Writer: c,
94+
StatusClient: c,
95+
}, nil
96+
},
8397
})
8498
if err != nil {
8599
log.Error(err, "Failed to create a new manager.")

0 commit comments

Comments
 (0)