Skip to content

Commit 8a36f16

Browse files
committed
[dashboard] add json rpc implement
1 parent a9b2883 commit 8a36f16

File tree

4 files changed

+108
-41
lines changed

4 files changed

+108
-41
lines changed

components/dashboard/src/service/json-rpc-workspace-client.ts

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,19 @@
44
* See License.AGPL.txt in the project root for license information.
55
*/
66

7-
import { Code, ConnectError, PromiseClient } from "@connectrpc/connect";
7+
import { CallOptions, Code, ConnectError, PromiseClient } from "@connectrpc/connect";
88
import { PartialMessage } from "@bufbuild/protobuf";
99
import { WorkspaceService } from "@gitpod/public-api/lib/gitpod/experimental/v2/workspace_connect";
10-
import { GetWorkspaceRequest, GetWorkspaceResponse } from "@gitpod/public-api/lib/gitpod/experimental/v2/workspace_pb";
10+
import {
11+
GetWorkspaceRequest,
12+
GetWorkspaceResponse,
13+
WatchWorkspaceRequest,
14+
WatchWorkspaceResponse,
15+
} from "@gitpod/public-api/lib/gitpod/experimental/v2/workspace_pb";
1116
import { converter } from "./public-api";
1217
import { getGitpodService } from "./service";
18+
import { generateAsyncGenerator } from "@gitpod/gitpod-protocol/lib/generate-async-generator";
19+
import { WorkspaceInstance } from "@gitpod/gitpod-protocol";
1320

1421
export class JsonRpcWorkspaceClient implements PromiseClient<typeof WorkspaceService> {
1522
async getWorkspace(request: PartialMessage<GetWorkspaceRequest>): Promise<GetWorkspaceResponse> {
@@ -22,4 +29,35 @@ export class JsonRpcWorkspaceClient implements PromiseClient<typeof WorkspaceSer
2229
result.item = workspace;
2330
return result;
2431
}
32+
33+
async *watchWorkspace(
34+
request: PartialMessage<WatchWorkspaceRequest>,
35+
options?: CallOptions,
36+
): AsyncIterable<WatchWorkspaceResponse> {
37+
if (!options?.signal) {
38+
throw new ConnectError("signal is required", Code.InvalidArgument);
39+
}
40+
const it = generateAsyncGenerator<WorkspaceInstance>(
41+
(sink) => {
42+
const dispose = getGitpodService().registerClient({
43+
onInstanceUpdate: (instance) => {
44+
sink.next(instance);
45+
},
46+
});
47+
return dispose.dispose;
48+
},
49+
{ signal: options.signal },
50+
);
51+
for await (const item of it) {
52+
if (!item) {
53+
continue;
54+
}
55+
if (request.id && item.workspaceId !== request.id) {
56+
continue;
57+
}
58+
const response = new WatchWorkspaceResponse();
59+
response.item = converter.toWorkspace(item);
60+
yield response;
61+
}
62+
}
2563
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/**
2+
* Copyright (c) 2023 Gitpod GmbH. All rights reserved.
3+
* Licensed under the GNU Affero General Public License (AGPL).
4+
* See License.AGPL.txt in the project root for license information.
5+
*/
6+
7+
type Sink<T> = {
8+
next: (value: T | void) => void;
9+
};
10+
11+
/**
12+
* Generates an asynchronous generator that yields values based on the provided setup function.
13+
*
14+
* the setup function that takes a sink and returns a cleanup function.
15+
* setup sink object has a `next` method that accepts a value to be pushed to the generator.
16+
*/
17+
export async function* generateAsyncGenerator<T>(
18+
setup: (sink: Sink<T>) => () => void,
19+
opts: { signal: AbortSignal },
20+
): AsyncGenerator<T | void, void, unknown> {
21+
const queue: T[] = [];
22+
23+
let resolveNext: ((value: T | void) => void) | null = null;
24+
25+
const sink: Sink<T> = {
26+
next: (value: T | void) => {
27+
if (resolveNext) {
28+
resolveNext(value);
29+
resolveNext = null;
30+
} else {
31+
if (value) {
32+
queue.push(value);
33+
}
34+
}
35+
},
36+
};
37+
38+
let isStopped = false;
39+
opts.signal.addEventListener("abort", () => {
40+
isStopped = true;
41+
sink.next();
42+
});
43+
44+
const cleanup = setup(sink);
45+
46+
try {
47+
while (!isStopped) {
48+
if (queue.length) {
49+
yield queue.shift();
50+
} else {
51+
yield new Promise<T | void>((resolve) => {
52+
resolveNext = resolve;
53+
});
54+
}
55+
}
56+
} catch (e) {
57+
} finally {
58+
cleanup();
59+
}
60+
}

components/server/src/api/workspace-service-api.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,6 @@ export class WorkspaceServiceAPI implements ServiceImpl<typeof WorkspaceServiceI
3434

3535
async *watchWorkspace(req: WatchWorkspaceRequest, context: HandlerContext): AsyncIterable<WatchWorkspaceResponse> {
3636
const it = this.workspaceService.watchWorkspaces(context.user.id, { signal: context.signal });
37-
context.signal.addEventListener("abort", async () => {
38-
await it.throw(new Error("abort")).then().catch();
39-
});
4037
for await (const info of it) {
4138
if (!info) {
4239
continue;

components/server/src/workspace/workspace-service.ts

Lines changed: 8 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import {
3030
WorkspaceTimeoutDuration,
3131
} from "@gitpod/gitpod-protocol";
3232
import { ErrorCodes, ApplicationError } from "@gitpod/gitpod-protocol/lib/messaging/error";
33+
import { generateAsyncGenerator } from "@gitpod/gitpod-protocol/lib/generate-async-generator";
3334
import { Authorizer } from "../authorization/authorizer";
3435
import { TraceContext } from "@gitpod/gitpod-protocol/lib/util/tracing";
3536
import { WorkspaceFactory } from "./workspace-factory";
@@ -733,42 +734,13 @@ export class WorkspaceService {
733734
return urls;
734735
}
735736

736-
public async *watchWorkspaces(userId: string, opts: { signal: AbortSignal }) {
737-
const queue: WorkspaceInstance[] = [];
738-
let resolveNext: ((d: WorkspaceInstance | void) => void) | null;
739-
740-
const next = () => new Promise<WorkspaceInstance | void>((res) => (resolveNext = res));
741-
const nextOK = (d: WorkspaceInstance | void) => {
742-
if (resolveNext) {
743-
resolveNext(d);
744-
if (d) {
745-
queue.push(d);
746-
}
747-
resolveNext = null;
748-
}
749-
};
750-
751-
let isStopped = false;
752-
opts.signal.addEventListener("abort", () => {
753-
isStopped = true;
754-
nextOK();
755-
});
756-
const dispose = this.subscriber.listenForWorkspaceInstanceUpdates(userId, (_ctx, instance) => {
757-
nextOK(instance);
758-
});
759-
760-
try {
761-
while (!isStopped) {
762-
if (queue.length) {
763-
yield queue.shift();
764-
} else {
765-
yield next();
766-
}
767-
}
768-
} catch (e) {
769-
} finally {
770-
dispose.dispose();
771-
}
737+
public watchWorkspaces(userId: string, opts: { signal: AbortSignal }) {
738+
return generateAsyncGenerator<WorkspaceInstance>((sink) => {
739+
const dispose = this.subscriber.listenForWorkspaceInstanceUpdates(userId, (_ctx, instance) => {
740+
sink.next(instance);
741+
});
742+
return dispose.dispose;
743+
}, opts);
772744
}
773745

774746
public async watchWorkspaceImageBuildLogs(

0 commit comments

Comments
 (0)