Skip to content

Commit 2058fed

Browse files
committed
fix(cache): update cache with O(1) data structures
The prior implementation used arrays to store cached objects. This meant that updates were O(n). In a controller I developed that managed ~70k PV and ~70k pvc 99.9% of CPU time was spent in cache updates pegging the entire process. This new implementation doesn't even have cache updates show up in the profiles and is using ~25m cpu for the same number of objects.
1 parent ea5041d commit 2058fed

File tree

2 files changed

+103
-85
lines changed

2 files changed

+103
-85
lines changed

src/cache.ts

Lines changed: 85 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,23 @@ export interface ObjectCache<T> {
2020
list(namespace?: string): ReadonlyArray<T>;
2121
}
2222

23+
// exported for testing
24+
export type CacheMap<T extends KubernetesObject> = Map<string, Map<string, T>>;
25+
2326
export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, Informer<T> {
24-
private objects: T[] = [];
27+
private objects: CacheMap<T> = new Map();
2528
private resourceVersion: string;
26-
private readonly indexCache: { [key: string]: T[] } = {};
27-
private readonly callbackCache: { [key: string]: Array<ObjectCallback<T> | ErrorCallback> } = {};
29+
private readonly callbackCache: {
30+
[key: string]: Array<ObjectCallback<T> | ErrorCallback>;
31+
} = {};
2832
private request: RequestResult | undefined;
29-
private stopped: boolean = false;
33+
private stopped = false;
3034

3135
public constructor(
3236
private readonly path: string,
3337
private readonly watch: Watch,
3438
private readonly listFn: ListPromise<T>,
35-
autoStart: boolean = true,
39+
autoStart = true,
3640
private readonly labelSelector?: string,
3741
) {
3842
this.callbackCache[ADD] = [];
@@ -93,18 +97,19 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
9397
}
9498

9599
public get(name: string, namespace?: string): T | undefined {
96-
return this.objects.find(
97-
(obj: T): boolean => {
98-
return obj.metadata!.name === name && (!namespace || obj.metadata!.namespace === namespace);
99-
},
100-
);
100+
const nsObjects = this.objects.get(namespace || '');
101+
if (nsObjects) {
102+
return nsObjects.get(name);
103+
}
104+
return undefined;
101105
}
102106

103107
public list(namespace?: string | undefined): ReadonlyArray<T> {
104-
if (!namespace) {
105-
return this.objects;
108+
const namespaceObjects = this.objects.get(namespace || '');
109+
if (!namespaceObjects) {
110+
return [];
106111
}
107-
return this.indexCache[namespace] as ReadonlyArray<T>;
112+
return Array.from(namespaceObjects.values());
108113
}
109114

110115
public latestResourceVersion(): string {
@@ -118,7 +123,7 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
118123
}
119124
}
120125

121-
private async doneHandler(err: any): Promise<any> {
126+
private async doneHandler(err: unknown): Promise<void> {
122127
this._stop();
123128
if (
124129
err &&
@@ -139,16 +144,8 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
139144
const result = await promise;
140145
const list = result.body;
141146
this.objects = deleteItems(this.objects, list.items, this.callbackCache[DELETE].slice());
142-
Object.keys(this.indexCache).forEach((key) => {
143-
const updateObjects = deleteItems(this.indexCache[key], list.items);
144-
if (updateObjects.length !== 0) {
145-
this.indexCache[key] = updateObjects;
146-
} else {
147-
delete this.indexCache[key];
148-
}
149-
});
150147
this.addOrUpdateItems(list.items);
151-
this.resourceVersion = list.metadata!.resourceVersion!;
148+
this.resourceVersion = list.metadata!.resourceVersion || '';
152149
}
153150
const queryParams = {
154151
resourceVersion: this.resourceVersion,
@@ -175,21 +172,9 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
175172
this.callbackCache[ADD].slice(),
176173
this.callbackCache[UPDATE].slice(),
177174
);
178-
if (obj.metadata!.namespace) {
179-
this.indexObj(obj);
180-
}
181175
});
182176
}
183177

184-
private indexObj(obj: T): void {
185-
let namespaceList = this.indexCache[obj.metadata!.namespace!] as T[];
186-
if (!namespaceList) {
187-
namespaceList = [];
188-
this.indexCache[obj.metadata!.namespace!] = namespaceList;
189-
}
190-
addOrUpdateObject(namespaceList, obj);
191-
}
192-
193178
private async watchHandler(
194179
phase: string,
195180
obj: T,
@@ -204,18 +189,9 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
204189
this.callbackCache[ADD].slice(),
205190
this.callbackCache[UPDATE].slice(),
206191
);
207-
if (obj.metadata!.namespace) {
208-
this.indexObj(obj);
209-
}
210192
break;
211193
case 'DELETED':
212194
deleteObject(this.objects, obj, this.callbackCache[DELETE].slice());
213-
if (obj.metadata!.namespace) {
214-
const namespaceList = this.indexCache[obj.metadata!.namespace!] as T[];
215-
if (namespaceList) {
216-
deleteObject(namespaceList, obj);
217-
}
218-
}
219195
break;
220196
case 'BOOKMARK':
221197
// nothing to do, here for documentation, mostly.
@@ -228,50 +204,83 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
228204
}
229205
}
230206

207+
// exported for testing
208+
export function cacheMapFromList<T extends KubernetesObject>(newObjects: T[]): CacheMap<T> {
209+
const objects: CacheMap<T> = new Map();
210+
// build up the new list
211+
for (const obj of newObjects) {
212+
let namespaceObjects = objects.get(obj.metadata!.namespace || '');
213+
if (!namespaceObjects) {
214+
namespaceObjects = new Map();
215+
objects.set(obj.metadata!.namespace || '', namespaceObjects);
216+
}
217+
218+
const name = obj.metadata!.name || '';
219+
namespaceObjects.set(name, obj);
220+
}
221+
return objects;
222+
}
223+
231224
// external for testing
232225
export function deleteItems<T extends KubernetesObject>(
233-
oldObjects: T[],
226+
oldObjects: CacheMap<T>,
234227
newObjects: T[],
235228
deleteCallback?: Array<ObjectCallback<T>>,
236-
): T[] {
237-
return oldObjects.filter((obj: T) => {
238-
if (findKubernetesObject(newObjects, obj) === -1) {
239-
if (deleteCallback) {
240-
deleteCallback.forEach((fn: ObjectCallback<T>) => fn(obj));
229+
): CacheMap<T> {
230+
const objects = cacheMapFromList(newObjects);
231+
232+
if (!deleteCallback) {
233+
return objects;
234+
}
235+
236+
for (const [namespace, oldNamespaceObjects] of oldObjects.entries()) {
237+
const newNamespaceObjects = objects.get(namespace);
238+
if (newNamespaceObjects) {
239+
for (const [name, oldObj] of oldNamespaceObjects.entries()) {
240+
if (newNamespaceObjects.has(name)) {
241+
deleteCallback.forEach((fn: ObjectCallback<T>) => fn(oldObj));
242+
}
241243
}
242-
return false;
244+
} else {
245+
oldNamespaceObjects.forEach((obj: T) => {
246+
deleteCallback.forEach((fn: ObjectCallback<T>) => fn(obj));
247+
});
243248
}
244-
return true;
245-
});
249+
}
250+
251+
return objects;
246252
}
247253

248254
// Only public for testing.
249255
export function addOrUpdateObject<T extends KubernetesObject>(
250-
objects: T[],
256+
objects: CacheMap<T>,
251257
obj: T,
252258
addCallback?: Array<ObjectCallback<T>>,
253259
updateCallback?: Array<ObjectCallback<T>>,
254260
): void {
255-
const ix = findKubernetesObject(objects, obj);
256-
if (ix === -1) {
257-
objects.push(obj);
261+
let namespaceObjects = objects.get(obj.metadata!.namespace || '');
262+
if (!namespaceObjects) {
263+
namespaceObjects = new Map();
264+
objects.set(obj.metadata!.namespace || '', namespaceObjects);
265+
}
266+
267+
const name = obj.metadata!.name || '';
268+
const found = namespaceObjects.get(name);
269+
if (!found) {
270+
namespaceObjects.set(name, obj);
258271
if (addCallback) {
259272
addCallback.forEach((elt: ObjectCallback<T>) => elt(obj));
260273
}
261274
} else {
262-
if (!isSameVersion(objects[ix], obj)) {
263-
objects[ix] = obj;
275+
if (!isSameVersion(found, obj)) {
276+
namespaceObjects.set(name, obj);
264277
if (updateCallback) {
265278
updateCallback.forEach((elt: ObjectCallback<T>) => elt(obj));
266279
}
267280
}
268281
}
269282
}
270283

271-
function isSameObject<T extends KubernetesObject>(o1: T, o2: T): boolean {
272-
return o1.metadata!.name === o2.metadata!.name && o1.metadata!.namespace === o2.metadata!.namespace;
273-
}
274-
275284
function isSameVersion<T extends KubernetesObject>(o1: T, o2: T): boolean {
276285
return (
277286
o1.metadata!.resourceVersion !== undefined &&
@@ -280,23 +289,26 @@ function isSameVersion<T extends KubernetesObject>(o1: T, o2: T): boolean {
280289
);
281290
}
282291

283-
function findKubernetesObject<T extends KubernetesObject>(objects: T[], obj: T): number {
284-
return objects.findIndex((elt: T) => {
285-
return isSameObject(elt, obj);
286-
});
287-
}
288-
289292
// Public for testing.
290293
export function deleteObject<T extends KubernetesObject>(
291-
objects: T[],
294+
objects: CacheMap<T>,
292295
obj: T,
293296
deleteCallback?: Array<ObjectCallback<T>>,
294297
): void {
295-
const ix = findKubernetesObject(objects, obj);
296-
if (ix !== -1) {
297-
objects.splice(ix, 1);
298+
const namespace = obj.metadata!.namespace || '';
299+
const name = obj.metadata!.name || '';
300+
301+
const namespaceObjects = objects.get(namespace);
302+
if (!namespaceObjects) {
303+
return;
304+
}
305+
const deleted = namespaceObjects.delete(name);
306+
if (deleted) {
298307
if (deleteCallback) {
299308
deleteCallback.forEach((elt: ObjectCallback<T>) => elt(obj));
300309
}
310+
if (namespaceObjects.size === 0) {
311+
objects.delete(namespace);
312+
}
301313
}
302314
}

src/cache_test.ts

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { Duplex } from 'stream';
99
import { EventEmitter } from 'ws';
1010

1111
import { V1Namespace, V1NamespaceList, V1ObjectMeta, V1Pod, V1ListMeta } from './api';
12-
import { deleteObject, ListWatch, deleteItems } from './cache';
12+
import { deleteObject, ListWatch, deleteItems, CacheMap, cacheMapFromList } from './cache';
1313
import { KubeConfig } from './config';
1414
import { Cluster, Context, User } from './config_types';
1515
import { ADD, UPDATE, DELETE, ERROR, ListPromise, CHANGE } from './informer';
@@ -689,7 +689,7 @@ describe('ListWatchCache', () => {
689689
});
690690

691691
it('should delete an object correctly', () => {
692-
const list: V1Pod[] = [
692+
const cache: CacheMap<V1Pod> = cacheMapFromList([
693693
{
694694
metadata: {
695695
name: 'name1',
@@ -702,28 +702,34 @@ describe('ListWatchCache', () => {
702702
namespace: 'ns2',
703703
} as V1ObjectMeta,
704704
} as V1Pod,
705-
];
706-
deleteObject(list, {
705+
]);
706+
deleteObject(cache, {
707707
metadata: {
708708
name: 'other',
709709
namespace: 'ns1',
710710
},
711711
} as V1Pod);
712-
expect(list.length).to.equal(2);
713-
deleteObject(list, {
712+
expect(cache.size).to.equal(2);
713+
expect(cache.get('ns1').size).to.equal(1);
714+
expect(cache.get('ns2').size).to.equal(1);
715+
deleteObject(cache, {
714716
metadata: {
715717
name: 'name1',
716718
namespace: 'ns2',
717719
},
718720
} as V1Pod);
719-
expect(list.length).to.equal(2);
720-
deleteObject(list, {
721+
expect(cache.size).to.equal(2);
722+
expect(cache.get('ns1').size).to.equal(1);
723+
expect(cache.get('ns2').size).to.equal(1);
724+
deleteObject(cache, {
721725
metadata: {
722726
name: 'name1',
723727
namespace: 'ns1',
724728
},
725729
} as V1Pod);
726-
expect(list.length).to.equal(1);
730+
expect(cache.size).to.equal(1);
731+
expect(cache.has('ns1')).to.equal(false);
732+
expect(cache.get('ns2').size).to.equal(1);
727733
});
728734

729735
it('should not call handlers which have been unregistered', async () => {
@@ -1330,7 +1336,7 @@ describe('ListWatchCache', () => {
13301336

13311337
describe('delete items', () => {
13321338
it('should remove correctly', () => {
1333-
const listA: V1Pod[] = [
1339+
const cacheA: CacheMap<V1Pod> = cacheMapFromList([
13341340
{
13351341
metadata: {
13361342
name: 'name1',
@@ -1343,7 +1349,7 @@ describe('delete items', () => {
13431349
namespace: 'ns2',
13441350
} as V1ObjectMeta,
13451351
} as V1Pod,
1346-
];
1352+
]);
13471353
const listB: V1Pod[] = [
13481354
{
13491355
metadata: {
@@ -1368,7 +1374,7 @@ describe('delete items', () => {
13681374
];
13691375
const pods: V1Pod[] = [];
13701376

1371-
deleteItems(listA, listB, [(obj?: V1Pod) => pods.push(obj!)]);
1377+
deleteItems(cacheA, listB, [(obj?: V1Pod) => pods.push(obj!)]);
13721378
expect(pods).to.deep.equal(expected);
13731379
});
13741380

0 commit comments

Comments
 (0)