Skip to content

Commit d12ec26

Browse files
committed
fix(cache): watch errors must call done handler
The type of `watchObject` was incorrect and has been updated to match the actual request body. Using this info it was clear that 'ERROR' events were not being handled correctly. When the watch receives an error it is not always an http status code, because the status code can only be sent when the stream is starting. This means that `410` resourceVersion out of date errors could only be handled if they were detected before the watch stream started leaving watches running on channels that would never receive more events and not notifying `ListWatch` consumers of the error.
1 parent f274cbf commit d12ec26

File tree

2 files changed

+98
-7
lines changed

2 files changed

+98
-7
lines changed

src/cache.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,10 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
120120

121121
private async doneHandler(err: any): Promise<any> {
122122
this._stop();
123-
if (err && err.statusCode === 410) {
123+
if (
124+
err &&
125+
((err as { statusCode?: number }).statusCode === 410 || (err as { code?: number }).code === 410)
126+
) {
124127
this.resourceVersion = '';
125128
} else if (err) {
126129
this.callbackCache[ERROR].forEach((elt: ErrorCallback) => elt(err));
@@ -187,7 +190,11 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
187190
addOrUpdateObject(namespaceList, obj);
188191
}
189192

190-
private watchHandler(phase: string, obj: T, watchObj?: any): void {
193+
private async watchHandler(
194+
phase: string,
195+
obj: T,
196+
watchObj?: { type: string; object: KubernetesObject },
197+
): Promise<void> {
191198
switch (phase) {
192199
case 'ADDED':
193200
case 'MODIFIED':
@@ -213,10 +220,11 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
213220
case 'BOOKMARK':
214221
// nothing to do, here for documentation, mostly.
215222
break;
223+
case 'ERROR':
224+
await this.doneHandler(obj);
225+
return;
216226
}
217-
if (watchObj && watchObj.metadata) {
218-
this.resourceVersion = watchObj.metadata.resourceVersion;
219-
}
227+
this.resourceVersion = obj.metadata!.resourceVersion || '';
220228
}
221229
}
222230

src/cache_test.ts

Lines changed: 85 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1096,9 +1096,10 @@ describe('ListWatchCache', () => {
10961096
{
10971097
metadata: {
10981098
name: 'name3',
1099+
resourceVersion: '23456',
10991100
} as V1ObjectMeta,
11001101
} as V1Namespace,
1101-
{ metadata: { resourceVersion: '23456' } },
1102+
{ type: 'ADDED', metadata: { resourceVersion: '23456' } },
11021103
);
11031104

11041105
await informer.stop();
@@ -1153,9 +1154,91 @@ describe('ListWatchCache', () => {
11531154
{
11541155
metadata: {
11551156
name: 'name3',
1157+
resourceVersion: '23456',
11561158
} as V1ObjectMeta,
11571159
} as V1Namespace,
1158-
{ metadata: { resourceVersion: '23456' } },
1160+
{ type: 'ADDED', metadata: { resourceVersion: '23456' } },
1161+
);
1162+
1163+
await informer.stop();
1164+
1165+
let errorEmitted = false;
1166+
informer.on('error', () => (errorEmitted = true));
1167+
1168+
promise = new Promise((resolve) => {
1169+
mock.when(
1170+
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
1171+
).thenCall(() => {
1172+
resolve(new FakeRequest());
1173+
});
1174+
});
1175+
1176+
informer.start();
1177+
await promise;
1178+
1179+
const [, , , doneHandler] = mock.capture(fakeWatch.watch).last();
1180+
1181+
const object = {
1182+
kind: 'Status',
1183+
apiVersion: 'v1',
1184+
metadata: {},
1185+
status: 'Failure',
1186+
message: 'too old resource version: 12345 (1234)',
1187+
reason: 'Expired',
1188+
code: 410,
1189+
};
1190+
await watchHandler('ERROR', object, { type: 'ERROR', object });
1191+
1192+
mock.verify(
1193+
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
1194+
).thrice();
1195+
expect(errorEmitted).to.equal(false);
1196+
expect(listCalls).to.be.equal(2);
1197+
});
1198+
1199+
it('should list if the watch errors from the last version', async () => {
1200+
const fakeWatch = mock.mock(Watch);
1201+
const list: V1Pod[] = [];
1202+
const listObj = {
1203+
metadata: {
1204+
resourceVersion: '12345',
1205+
} as V1ListMeta,
1206+
items: list,
1207+
} as V1NamespaceList;
1208+
1209+
let listCalls = 0;
1210+
const listFn: ListPromise<V1Namespace> = function(): Promise<{
1211+
response: http.IncomingMessage;
1212+
body: V1NamespaceList;
1213+
}> {
1214+
return new Promise<{ response: http.IncomingMessage; body: V1NamespaceList }>((resolve) => {
1215+
listCalls++;
1216+
resolve({ response: {} as http.IncomingMessage, body: listObj });
1217+
});
1218+
};
1219+
let promise = new Promise((resolve) => {
1220+
mock.when(
1221+
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
1222+
).thenCall(() => {
1223+
resolve(new FakeRequest());
1224+
});
1225+
});
1226+
1227+
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn, false);
1228+
1229+
informer.start();
1230+
await promise;
1231+
1232+
const [, , watchHandler] = mock.capture(fakeWatch.watch).last();
1233+
watchHandler(
1234+
'ADDED',
1235+
{
1236+
metadata: {
1237+
name: 'name3',
1238+
resourceVersion: '23456',
1239+
} as V1ObjectMeta,
1240+
} as V1Namespace,
1241+
{ type: 'ADDED', metadata: { resourceVersion: '23456' } },
11591242
);
11601243

11611244
await informer.stop();

0 commit comments

Comments
 (0)