Skip to content

Commit a9b2883

Browse files
committed
fix abort
1 parent 4ac219e commit a9b2883

File tree

2 files changed

+20
-13
lines changed

2 files changed

+20
-13
lines changed

components/server/src/api/workspace-service-api.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ export class WorkspaceServiceAPI implements ServiceImpl<typeof WorkspaceServiceI
3333
}
3434

3535
async *watchWorkspace(req: WatchWorkspaceRequest, context: HandlerContext): AsyncIterable<WatchWorkspaceResponse> {
36-
const it = this.workspaceService.watchWorkspaces(context.user.id);
36+
const it = this.workspaceService.watchWorkspaces(context.user.id, { signal: context.signal });
3737
context.signal.addEventListener("abort", async () => {
38-
await it.throw(new Error("abort"));
38+
await it.throw(new Error("abort")).then().catch();
3939
});
4040
for await (const info of it) {
4141
if (!info) {

components/server/src/workspace/workspace-service.ts

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -733,29 +733,36 @@ export class WorkspaceService {
733733
return urls;
734734
}
735735

736-
public async *watchWorkspaces(userId: string) {
737-
let resolveNext: (() => void) | null;
738-
const next = () => new Promise<void>((res) => (resolveNext = res));
739-
const nextOK = () => {
736+
public async *watchWorkspaces(userId: string, opts: { signal: AbortSignal }) {
737+
const queue: WorkspaceInstance[] = [];
738+
let resolveNext: ((d: WorkspaceInstance | void) => void) | null;
739+
740+
const next = () => new Promise<WorkspaceInstance | void>((res) => (resolveNext = res));
741+
const nextOK = (d: WorkspaceInstance | void) => {
740742
if (resolveNext) {
741-
resolveNext();
743+
resolveNext(d);
744+
if (d) {
745+
queue.push(d);
746+
}
742747
resolveNext = null;
743748
}
744749
};
745750

746-
const queue: WorkspaceInstance[] = [];
747-
748-
const dispose = this.subscriber.listenForWorkspaceInstanceUpdates(userId, (_ctx, instance) => {
749-
queue.push(instance);
751+
let isStopped = false;
752+
opts.signal.addEventListener("abort", () => {
753+
isStopped = true;
750754
nextOK();
751755
});
756+
const dispose = this.subscriber.listenForWorkspaceInstanceUpdates(userId, (_ctx, instance) => {
757+
nextOK(instance);
758+
});
752759

753760
try {
754-
while (true) {
761+
while (!isStopped) {
755762
if (queue.length) {
756763
yield queue.shift();
757764
} else {
758-
await next();
765+
yield next();
759766
}
760767
}
761768
} catch (e) {

0 commit comments

Comments
 (0)