-
Notifications
You must be signed in to change notification settings - Fork 1.3k
[papi] add watchWorkspace API #19010
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
145584c
[papi] add watch workspace status api
mustard-mh 22248e4
update import
mustard-mh e11560d
1
mustard-mh 50e7527
debug commit
mustard-mh 0ef6e8e
Add workspace_id to proto
mustard-mh 10f8b46
Add unit test for async generator
mustard-mh 75b1576
Update usage of func
mustard-mh 8a448fb
update test cases
mustard-mh ed01347
fix json rpc watch
mustard-mh 012ff38
1
mustard-mh 7e5c72f
remove test commit and fix missing field
mustard-mh File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
185 changes: 185 additions & 0 deletions
185
components/gitpod-protocol/src/generate-async-generator.spec.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,185 @@ | ||
/** | ||
* Copyright (c) 2023 Gitpod GmbH. All rights reserved. | ||
* Licensed under the GNU Affero General Public License (AGPL). | ||
* See License.AGPL.txt in the project root for license information. | ||
*/ | ||
|
||
import { suite, test } from "@testdeck/mocha"; | ||
import * as chai from "chai"; | ||
|
||
import { generateAsyncGenerator } from "./generate-async-generator"; | ||
import { Disposable } from "./util/disposable"; | ||
|
||
const expect = chai.expect; | ||
|
||
function watchWith(times: number, listener: (value: number) => void): Disposable { | ||
let i = 0; | ||
const cancel = setInterval(() => { | ||
if (i < times) { | ||
listener(i++); | ||
} | ||
}, 100); | ||
return { | ||
dispose: () => { | ||
clearInterval(cancel); | ||
}, | ||
}; | ||
} | ||
|
||
const error = new Error("Test error"); | ||
interface Ref { | ||
isDisposed: boolean; | ||
result: number[]; | ||
watchStarted: boolean; | ||
} | ||
|
||
interface Option { | ||
errorAfter?: number; | ||
times: number; | ||
abortAfterMs?: number; | ||
setupError?: boolean; | ||
} | ||
|
||
function watchIterator(ref: Ref, opts: Option) { | ||
const abortController = new AbortController(); | ||
setTimeout(() => { | ||
abortController.abort(); | ||
}, opts.abortAfterMs ?? 600); | ||
return generateAsyncGenerator<number>( | ||
(sink) => { | ||
try { | ||
if (opts.setupError) { | ||
throw error; | ||
} | ||
ref.watchStarted = true; | ||
const dispose = watchWith(opts.times, (v) => { | ||
if (opts.errorAfter && opts.errorAfter === v) { | ||
sink.fail(error); | ||
return; | ||
} | ||
sink.push(v); | ||
}); | ||
return () => { | ||
ref.isDisposed = true; | ||
dispose.dispose(); | ||
}; | ||
} catch (e) { | ||
sink.fail(e as any as Error); | ||
} | ||
}, | ||
{ signal: abortController.signal }, | ||
); | ||
} | ||
|
||
@suite | ||
class TestGenerateAsyncGenerator { | ||
@test public async "happy path"() { | ||
const ref: Ref = { isDisposed: false, result: [], watchStarted: false }; | ||
const it = watchIterator(ref, { times: 5 }); | ||
try { | ||
for await (const v of it) { | ||
ref.result.push(v); | ||
} | ||
expect.fail("should throw error"); | ||
} catch (e) { | ||
if (ref.watchStarted) { | ||
expect(ref.isDisposed).to.be.equal(true); | ||
} | ||
expect(e.message).to.be.equal("Abort error"); | ||
expect(ref.result.length).to.be.equal(5); | ||
ref.result.forEach((v, i) => expect(v).to.be.equal(i)); | ||
expect(ref.isDisposed).to.be.equal(true); | ||
} | ||
} | ||
|
||
@test public async "should be stopped after abort signal is triggered"() { | ||
const ref: Ref = { isDisposed: false, result: [], watchStarted: false }; | ||
const it = watchIterator(ref, { times: 5, abortAfterMs: 120 }); | ||
try { | ||
for await (const v of it) { | ||
ref.result.push(v); | ||
} | ||
expect.fail("should throw error"); | ||
} catch (e) { | ||
if (ref.watchStarted) { | ||
expect(ref.isDisposed).to.be.equal(true); | ||
} | ||
expect(e.message).to.be.equal("Abort error"); | ||
expect(ref.result[0]).to.be.equal(0); | ||
expect(ref.result.length).to.be.equal(1); | ||
ref.result.forEach((v, i) => expect(v).to.be.equal(i)); | ||
expect(ref.isDisposed).to.be.equal(true); | ||
} | ||
} | ||
|
||
@test public async "should throw error if setup throws"() { | ||
const ref: Ref = { isDisposed: false, result: [], watchStarted: false }; | ||
const it = watchIterator(ref, { times: 5, setupError: true }); | ||
try { | ||
for await (const v of it) { | ||
ref.result.push(v); | ||
} | ||
expect.fail("should throw error"); | ||
} catch (e) { | ||
if (ref.watchStarted) { | ||
expect(ref.isDisposed).to.be.equal(true); | ||
} | ||
expect(e).to.be.equal(error); | ||
expect(ref.result.length).to.be.equal(0); | ||
ref.result.forEach((v, i) => expect(v).to.be.equal(i)); | ||
expect(ref.isDisposed).to.be.equal(false); | ||
} | ||
} | ||
|
||
@test public async "should propagate errors from sink.next"() { | ||
const ref: Ref = { isDisposed: false, result: [], watchStarted: false }; | ||
const it = watchIterator(ref, { times: 5, errorAfter: 2 }); | ||
try { | ||
for await (const v of it) { | ||
ref.result.push(v); | ||
} | ||
expect.fail("should throw error"); | ||
} catch (e) { | ||
if (ref.watchStarted) { | ||
expect(ref.isDisposed).to.be.equal(true); | ||
} | ||
expect(e).to.be.equal(error); | ||
expect(ref.result.length).to.be.equal(2); | ||
ref.result.forEach((v, i) => expect(v).to.be.equal(i)); | ||
expect(ref.isDisposed).to.be.equal(true); | ||
} | ||
} | ||
|
||
@test public async "should not start iterator if pre throw error in an iterator"() { | ||
const ref: Ref = { isDisposed: false, result: [], watchStarted: false }; | ||
const it = this.mockWatchWorkspaceStatus(ref, { times: 5, errorAfter: 2 }); | ||
try { | ||
for await (const v of it) { | ||
ref.result.push(v); | ||
} | ||
expect.fail("should throw error"); | ||
} catch (e) { | ||
expect(ref.watchStarted).to.be.equal(false); | ||
if (ref.watchStarted) { | ||
expect(ref.isDisposed).to.be.equal(true); | ||
} | ||
expect(e.message).to.be.equal("Should throw error"); | ||
expect(ref.result.length).to.be.equal(0); | ||
ref.result.forEach((v, i) => expect(v).to.be.equal(i)); | ||
expect(ref.isDisposed).to.be.equal(false); | ||
} | ||
} | ||
|
||
async *mockWatchWorkspaceStatus(ref: Ref, option: Option): AsyncIterable<number> { | ||
const shouldThrow = true; | ||
if (shouldThrow) { | ||
throw new Error("Should throw error"); | ||
} | ||
const it = watchIterator(ref, option); | ||
for await (const item of it) { | ||
yield item; | ||
} | ||
} | ||
} | ||
|
||
module.exports = new TestGenerateAsyncGenerator(); // Only to circumvent no usage warning :-/ |
35 changes: 35 additions & 0 deletions
35
components/gitpod-protocol/src/generate-async-generator.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
/** | ||
* Copyright (c) 2023 Gitpod GmbH. All rights reserved. | ||
* Licensed under the GNU Affero General Public License (AGPL). | ||
* See License.AGPL.txt in the project root for license information. | ||
*/ | ||
|
||
import { EventIterator } from "event-iterator"; | ||
import { Queue } from "event-iterator/lib/event-iterator"; | ||
|
||
/** | ||
* Generates an asynchronous generator that yields values based on the provided setup function. | ||
* | ||
* the setup function that takes a queue and returns a cleanup function. | ||
* `queue.next` method that accepts a value to be pushed to the generator. | ||
* | ||
* remember that setup callback MUST wrap with try catch and use `queue.fail` to propagate error | ||
* | ||
* Iterator will always at least end with throw an `Abort error` | ||
*/ | ||
export function generateAsyncGenerator<T>( | ||
setup: (queue: Queue<T>) => (() => void) | void, | ||
opts: { signal: AbortSignal }, | ||
) { | ||
return new EventIterator<T>((queue) => { | ||
opts.signal.addEventListener("abort", () => { | ||
queue.fail(new Error("Abort error")); | ||
}); | ||
const dispose = setup(queue); | ||
return () => { | ||
if (dispose) { | ||
dispose(); | ||
} | ||
}; | ||
}); | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,12 +12,32 @@ service WorkspaceService { | |
// +return NOT_FOUND User does not have access to a workspace with the given | ||
// ID +return NOT_FOUND Workspace does not exist | ||
rpc GetWorkspace(GetWorkspaceRequest) returns (GetWorkspaceResponse) {} | ||
|
||
// WatchWorkspaceStatus watchs the workspaces status changes | ||
// | ||
// workspace_id +return NOT_FOUND Workspace does not exist | ||
rpc WatchWorkspaceStatus(WatchWorkspaceStatusRequest) returns (stream WatchWorkspaceStatusResponse) {} | ||
} | ||
|
||
message GetWorkspaceRequest { string id = 1; } | ||
|
||
message GetWorkspaceResponse { Workspace item = 1; } | ||
|
||
message WatchWorkspaceStatusRequest { | ||
// workspace_id specifies the workspace to watch | ||
// | ||
// +optional if empty then watch all workspaces | ||
optional string workspace_id = 1; | ||
} | ||
|
||
message WatchWorkspaceStatusResponse { | ||
// workspace_id is the ID of the workspace that has status updated | ||
string workspace_id = 1; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need |
||
|
||
// status is the updated status of workspace | ||
WorkspaceStatus status = 2; | ||
} | ||
|
||
// +resource get workspace | ||
message Workspace { | ||
string id = 1; | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.