Skip to content

Commit 70a0f12

Browse files
authored
[papi] add watchWorkspace API (#19010)
* [papi] add watch workspace status api * update import * 1 * debug commit * Add workspace_id to proto * Add unit test for async generator * Update usage of func * update test cases * fix json rpc watch * 1 * remove test commit and fix missing field
1 parent 9f692bb commit 70a0f12

File tree

15 files changed

+973
-234
lines changed

15 files changed

+973
-234
lines changed

components/dashboard/src/service/json-rpc-workspace-client.ts

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,19 @@
44
* See License.AGPL.txt in the project root for license information.
55
*/
66

7-
import { Code, ConnectError, PromiseClient } from "@connectrpc/connect";
7+
import { CallOptions, Code, ConnectError, PromiseClient } from "@connectrpc/connect";
88
import { PartialMessage } from "@bufbuild/protobuf";
99
import { WorkspaceService } from "@gitpod/public-api/lib/gitpod/v1/workspace_connect";
10-
import { GetWorkspaceRequest, GetWorkspaceResponse } from "@gitpod/public-api/lib/gitpod/v1/workspace_pb";
10+
import {
11+
GetWorkspaceRequest,
12+
GetWorkspaceResponse,
13+
WatchWorkspaceStatusRequest,
14+
WatchWorkspaceStatusResponse,
15+
} from "@gitpod/public-api/lib/gitpod/v1/workspace_pb";
1116
import { converter } from "./public-api";
1217
import { getGitpodService } from "./service";
18+
import { generateAsyncGenerator } from "@gitpod/gitpod-protocol/lib/generate-async-generator";
19+
import { WorkspaceInstance } from "@gitpod/gitpod-protocol";
1320

1421
export class JsonRpcWorkspaceClient implements PromiseClient<typeof WorkspaceService> {
1522
async getWorkspace(request: PartialMessage<GetWorkspaceRequest>): Promise<GetWorkspaceResponse> {
@@ -22,4 +29,55 @@ export class JsonRpcWorkspaceClient implements PromiseClient<typeof WorkspaceSer
2229
result.item = workspace;
2330
return result;
2431
}
32+
33+
async *watchWorkspaceStatus(
34+
request: PartialMessage<WatchWorkspaceStatusRequest>,
35+
options?: CallOptions,
36+
): AsyncIterable<WatchWorkspaceStatusResponse> {
37+
if (!options?.signal) {
38+
throw new ConnectError("signal is required", Code.InvalidArgument);
39+
}
40+
if (request.workspaceId) {
41+
const resp = await this.getWorkspace({ id: request.workspaceId });
42+
if (resp.item?.status) {
43+
const response = new WatchWorkspaceStatusResponse();
44+
response.workspaceId = resp.item.id;
45+
response.status = resp.item.status;
46+
yield response;
47+
}
48+
}
49+
const it = generateAsyncGenerator<WorkspaceInstance>(
50+
(queue) => {
51+
try {
52+
const dispose = getGitpodService().registerClient({
53+
onInstanceUpdate: (instance) => {
54+
queue.push(instance);
55+
},
56+
});
57+
return () => {
58+
dispose.dispose();
59+
};
60+
} catch (e) {
61+
queue.fail(e);
62+
}
63+
},
64+
{ signal: options.signal },
65+
);
66+
for await (const item of it) {
67+
if (!item) {
68+
continue;
69+
}
70+
if (request.workspaceId && item.workspaceId !== request.workspaceId) {
71+
continue;
72+
}
73+
const status = converter.toWorkspace(item).status;
74+
if (!status) {
75+
continue;
76+
}
77+
const response = new WatchWorkspaceStatusResponse();
78+
response.workspaceId = item.workspaceId;
79+
response.status = status;
80+
yield response;
81+
}
82+
}
2583
}

components/gitpod-protocol/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
"analytics-node": "^6.0.0",
6767
"configcat-node": "^8.0.0",
6868
"cookie": "^0.4.2",
69+
"event-iterator": "^2.0.0",
6970
"express": "^4.17.3",
7071
"google-protobuf": "^3.19.1",
7172
"inversify": "^6.0.1",
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/**
2+
* Copyright (c) 2023 Gitpod GmbH. All rights reserved.
3+
* Licensed under the GNU Affero General Public License (AGPL).
4+
* See License.AGPL.txt in the project root for license information.
5+
*/
6+
7+
import { suite, test } from "@testdeck/mocha";
8+
import * as chai from "chai";
9+
10+
import { generateAsyncGenerator } from "./generate-async-generator";
11+
import { Disposable } from "./util/disposable";
12+
13+
const expect = chai.expect;
14+
15+
function watchWith(times: number, listener: (value: number) => void): Disposable {
16+
let i = 0;
17+
const cancel = setInterval(() => {
18+
if (i < times) {
19+
listener(i++);
20+
}
21+
}, 100);
22+
return {
23+
dispose: () => {
24+
clearInterval(cancel);
25+
},
26+
};
27+
}
28+
29+
const error = new Error("Test error");
30+
interface Ref {
31+
isDisposed: boolean;
32+
result: number[];
33+
watchStarted: boolean;
34+
}
35+
36+
interface Option {
37+
errorAfter?: number;
38+
times: number;
39+
abortAfterMs?: number;
40+
setupError?: boolean;
41+
}
42+
43+
function watchIterator(ref: Ref, opts: Option) {
44+
const abortController = new AbortController();
45+
setTimeout(() => {
46+
abortController.abort();
47+
}, opts.abortAfterMs ?? 600);
48+
return generateAsyncGenerator<number>(
49+
(sink) => {
50+
try {
51+
if (opts.setupError) {
52+
throw error;
53+
}
54+
ref.watchStarted = true;
55+
const dispose = watchWith(opts.times, (v) => {
56+
if (opts.errorAfter && opts.errorAfter === v) {
57+
sink.fail(error);
58+
return;
59+
}
60+
sink.push(v);
61+
});
62+
return () => {
63+
ref.isDisposed = true;
64+
dispose.dispose();
65+
};
66+
} catch (e) {
67+
sink.fail(e as any as Error);
68+
}
69+
},
70+
{ signal: abortController.signal },
71+
);
72+
}
73+
74+
@suite
75+
class TestGenerateAsyncGenerator {
76+
@test public async "happy path"() {
77+
const ref: Ref = { isDisposed: false, result: [], watchStarted: false };
78+
const it = watchIterator(ref, { times: 5 });
79+
try {
80+
for await (const v of it) {
81+
ref.result.push(v);
82+
}
83+
expect.fail("should throw error");
84+
} catch (e) {
85+
if (ref.watchStarted) {
86+
expect(ref.isDisposed).to.be.equal(true);
87+
}
88+
expect(e.message).to.be.equal("Abort error");
89+
expect(ref.result.length).to.be.equal(5);
90+
ref.result.forEach((v, i) => expect(v).to.be.equal(i));
91+
expect(ref.isDisposed).to.be.equal(true);
92+
}
93+
}
94+
95+
@test public async "should be stopped after abort signal is triggered"() {
96+
const ref: Ref = { isDisposed: false, result: [], watchStarted: false };
97+
const it = watchIterator(ref, { times: 5, abortAfterMs: 120 });
98+
try {
99+
for await (const v of it) {
100+
ref.result.push(v);
101+
}
102+
expect.fail("should throw error");
103+
} catch (e) {
104+
if (ref.watchStarted) {
105+
expect(ref.isDisposed).to.be.equal(true);
106+
}
107+
expect(e.message).to.be.equal("Abort error");
108+
expect(ref.result[0]).to.be.equal(0);
109+
expect(ref.result.length).to.be.equal(1);
110+
ref.result.forEach((v, i) => expect(v).to.be.equal(i));
111+
expect(ref.isDisposed).to.be.equal(true);
112+
}
113+
}
114+
115+
@test public async "should throw error if setup throws"() {
116+
const ref: Ref = { isDisposed: false, result: [], watchStarted: false };
117+
const it = watchIterator(ref, { times: 5, setupError: true });
118+
try {
119+
for await (const v of it) {
120+
ref.result.push(v);
121+
}
122+
expect.fail("should throw error");
123+
} catch (e) {
124+
if (ref.watchStarted) {
125+
expect(ref.isDisposed).to.be.equal(true);
126+
}
127+
expect(e).to.be.equal(error);
128+
expect(ref.result.length).to.be.equal(0);
129+
ref.result.forEach((v, i) => expect(v).to.be.equal(i));
130+
expect(ref.isDisposed).to.be.equal(false);
131+
}
132+
}
133+
134+
@test public async "should propagate errors from sink.next"() {
135+
const ref: Ref = { isDisposed: false, result: [], watchStarted: false };
136+
const it = watchIterator(ref, { times: 5, errorAfter: 2 });
137+
try {
138+
for await (const v of it) {
139+
ref.result.push(v);
140+
}
141+
expect.fail("should throw error");
142+
} catch (e) {
143+
if (ref.watchStarted) {
144+
expect(ref.isDisposed).to.be.equal(true);
145+
}
146+
expect(e).to.be.equal(error);
147+
expect(ref.result.length).to.be.equal(2);
148+
ref.result.forEach((v, i) => expect(v).to.be.equal(i));
149+
expect(ref.isDisposed).to.be.equal(true);
150+
}
151+
}
152+
153+
@test public async "should not start iterator if pre throw error in an iterator"() {
154+
const ref: Ref = { isDisposed: false, result: [], watchStarted: false };
155+
const it = this.mockWatchWorkspaceStatus(ref, { times: 5, errorAfter: 2 });
156+
try {
157+
for await (const v of it) {
158+
ref.result.push(v);
159+
}
160+
expect.fail("should throw error");
161+
} catch (e) {
162+
expect(ref.watchStarted).to.be.equal(false);
163+
if (ref.watchStarted) {
164+
expect(ref.isDisposed).to.be.equal(true);
165+
}
166+
expect(e.message).to.be.equal("Should throw error");
167+
expect(ref.result.length).to.be.equal(0);
168+
ref.result.forEach((v, i) => expect(v).to.be.equal(i));
169+
expect(ref.isDisposed).to.be.equal(false);
170+
}
171+
}
172+
173+
async *mockWatchWorkspaceStatus(ref: Ref, option: Option): AsyncIterable<number> {
174+
const shouldThrow = true;
175+
if (shouldThrow) {
176+
throw new Error("Should throw error");
177+
}
178+
const it = watchIterator(ref, option);
179+
for await (const item of it) {
180+
yield item;
181+
}
182+
}
183+
}
184+
185+
module.exports = new TestGenerateAsyncGenerator(); // Only to circumvent no usage warning :-/
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/**
2+
* Copyright (c) 2023 Gitpod GmbH. All rights reserved.
3+
* Licensed under the GNU Affero General Public License (AGPL).
4+
* See License.AGPL.txt in the project root for license information.
5+
*/
6+
7+
import { EventIterator } from "event-iterator";
8+
import { Queue } from "event-iterator/lib/event-iterator";
9+
10+
/**
11+
* Generates an asynchronous generator that yields values based on the provided setup function.
12+
*
13+
* the setup function that takes a queue and returns a cleanup function.
14+
* `queue.next` method that accepts a value to be pushed to the generator.
15+
*
16+
* remember that setup callback MUST wrap with try catch and use `queue.fail` to propagate error
17+
*
18+
* Iterator will always at least end with throw an `Abort error`
19+
*/
20+
export function generateAsyncGenerator<T>(
21+
setup: (queue: Queue<T>) => (() => void) | void,
22+
opts: { signal: AbortSignal },
23+
) {
24+
return new EventIterator<T>((queue) => {
25+
opts.signal.addEventListener("abort", () => {
26+
queue.fail(new Error("Abort error"));
27+
});
28+
const dispose = setup(queue);
29+
return () => {
30+
if (dispose) {
31+
dispose();
32+
}
33+
};
34+
});
35+
}

components/public-api/gitpod/v1/workspace.proto

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,32 @@ service WorkspaceService {
1212
// +return NOT_FOUND User does not have access to a workspace with the given
1313
// ID +return NOT_FOUND Workspace does not exist
1414
rpc GetWorkspace(GetWorkspaceRequest) returns (GetWorkspaceResponse) {}
15+
16+
// WatchWorkspaceStatus watchs the workspaces status changes
17+
//
18+
// workspace_id +return NOT_FOUND Workspace does not exist
19+
rpc WatchWorkspaceStatus(WatchWorkspaceStatusRequest) returns (stream WatchWorkspaceStatusResponse) {}
1520
}
1621

1722
message GetWorkspaceRequest { string id = 1; }
1823

1924
message GetWorkspaceResponse { Workspace item = 1; }
2025

26+
message WatchWorkspaceStatusRequest {
27+
// workspace_id specifies the workspace to watch
28+
//
29+
// +optional if empty then watch all workspaces
30+
optional string workspace_id = 1;
31+
}
32+
33+
message WatchWorkspaceStatusResponse {
34+
// workspace_id is the ID of the workspace that has status updated
35+
string workspace_id = 1;
36+
37+
// status is the updated status of workspace
38+
WorkspaceStatus status = 2;
39+
}
40+
2141
// +resource get workspace
2242
message Workspace {
2343
string id = 1;

0 commit comments

Comments
 (0)