Skip to content

Commit 1051a69

Browse files
committed
fix server side streams
1 parent 36a900a commit 1051a69

File tree

2 files changed

+98
-68
lines changed

2 files changed

+98
-68
lines changed

components/dashboard/src/service/service.tsx

Lines changed: 58 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -59,67 +59,70 @@ export function getGitpodService(): GitpodService {
5959
const service = _gp.gitpodService || (_gp.gitpodService = require("./service-mock").gitpodServiceMock);
6060
return service;
6161
}
62-
let user: any;
63-
const service = _gp.gitpodService || (_gp.gitpodService = createGitpodService());
64-
service.server = new Proxy(service.server, {
65-
get(target, propKey) {
66-
return async function (...args: any[]) {
67-
if (propKey === "getLoggedInUser") {
68-
user = await target[propKey](...args);
69-
return user;
70-
}
71-
if (propKey === "getWorkspace") {
72-
try {
73-
return await target[propKey](...args);
74-
} finally {
75-
const grpcType = "unary";
76-
// emulates frequent unary calls to public API
77-
const isTest = await getExperimentsClient().getValueAsync(
78-
"public_api_dummy_reliability_test",
79-
false,
80-
{ user, grpcType },
81-
);
82-
if (isTest) {
83-
helloService.sayHello({}).catch((e) => {
84-
metricsReporter.reportError(e, {
85-
userId: user?.id,
86-
workspaceId: args[0],
87-
grpcType,
62+
let service = _gp.gitpodService;
63+
if (!service) {
64+
let user: any;
65+
service = _gp.gitpodService = createGitpodService();
66+
service.server = new Proxy(service.server, {
67+
get(target, propKey) {
68+
return async function (...args: any[]) {
69+
if (propKey === "getLoggedInUser") {
70+
user = await target[propKey](...args);
71+
return user;
72+
}
73+
if (propKey === "getWorkspace") {
74+
try {
75+
return await target[propKey](...args);
76+
} finally {
77+
const grpcType = "unary";
78+
// emulates frequent unary calls to public API
79+
const isTest = await getExperimentsClient().getValueAsync(
80+
"public_api_dummy_reliability_test",
81+
false,
82+
{ user, grpcType },
83+
);
84+
if (isTest) {
85+
helloService.sayHello({}).catch((e) => {
86+
metricsReporter.reportError(e, {
87+
userId: user?.id,
88+
workspaceId: args[0],
89+
grpcType,
90+
});
91+
console.error(e);
8892
});
89-
console.error(e);
90-
});
93+
}
9194
}
9295
}
93-
}
94-
return target[propKey](...args);
95-
};
96-
},
97-
});
98-
(async () => {
99-
const grpcType = "server-stream";
100-
// emulates server side streaming with public API
101-
while (true) {
102-
const isTest = await getExperimentsClient().getValueAsync("public_api_dummy_reliability_test", false, {
103-
user,
104-
grpcType,
105-
});
106-
if (isTest) {
107-
try {
108-
let previousCount = 0;
109-
for await (const reply of helloService.lotsOfReplies({ previousCount })) {
110-
previousCount = reply.count;
96+
return target[propKey](...args);
97+
};
98+
},
99+
});
100+
(async () => {
101+
const grpcType = "server-stream";
102+
// emulates server side streaming with public API
103+
while (true) {
104+
const isTest = await getExperimentsClient().getValueAsync("public_api_dummy_reliability_test", false, {
105+
user,
106+
grpcType,
107+
});
108+
if (isTest) {
109+
try {
110+
let previousCount = 0;
111+
for await (const reply of helloService.lotsOfReplies({ previousCount })) {
112+
previousCount = reply.count;
113+
}
114+
} catch (e) {
115+
metricsReporter.reportError(e, {
116+
userId: user?.id,
117+
grpcType,
118+
});
119+
console.error(e);
111120
}
112-
} catch (e) {
113-
metricsReporter.reportError(e, {
114-
userId: user?.id,
115-
grpcType,
116-
});
117-
console.error(e);
118121
}
122+
await new Promise((resolve) => setTimeout(resolve, 3000));
119123
}
120-
await new Promise((resolve) => setTimeout(resolve, 3000));
121-
}
122-
})();
124+
})();
125+
}
123126
return service;
124127
}
125128

components/server/src/api/server.ts

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import { APIStatsService } from "./stats";
2424
import { APITeamsService } from "./teams";
2525
import { APIUserService } from "./user";
2626
import { APIWorkspacesService } from "./workspaces";
27+
import { Deferred } from "@gitpod/gitpod-protocol/lib/util/deferred";
2728

2829
function service<T extends ServiceType>(type: T, impl: ServiceImpl<T>): [T, ServiceImpl<T>] {
2930
return [type, impl];
@@ -94,13 +95,12 @@ export class API {
9495
* - logging context
9596
* - tracing
9697
*/
97-
9898
private interceptService<T extends ServiceType>(type: T): ProxyHandler<ServiceImpl<T>> {
9999
const grpc_service = type.typeName;
100100
const self = this;
101101
return {
102102
get(target, prop) {
103-
return async (...args: any[]) => {
103+
return (...args: any[]) => {
104104
const method = type.methods[prop as any];
105105
if (!method) {
106106
// Increment metrics for unknown method attempts
@@ -125,21 +125,48 @@ export class API {
125125

126126
grpcServerStarted.labels(grpc_service, grpc_method, grpc_type).inc();
127127
const stopTimer = grpcServerHandling.startTimer({ grpc_service, grpc_method, grpc_type });
128+
const deferred = new Deferred<ConnectError>();
129+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
130+
deferred.promise.then((err) => {
131+
const grpc_code = err ? Code[err.code] : "OK";
132+
grpcServerHandled.labels(grpc_service, grpc_method, grpc_type, grpc_code).inc();
133+
stopTimer({ grpc_code });
134+
});
128135

129-
let grpc_code = "OK";
130-
try {
131-
const context = args[1] as HandlerContext;
136+
const context = args[1] as HandlerContext;
137+
async function call<T>(): Promise<T> {
132138
const user = await self.verify(context);
133139
context.user = user;
134-
return await (target[prop as any] as Function).apply(target, args);
135-
} catch (e) {
136-
const err = ConnectError.from(e);
137-
grpc_code = Code[err.code];
138-
throw err;
139-
} finally {
140-
grpcServerHandled.labels(grpc_service, grpc_method, grpc_type, grpc_code).inc();
141-
stopTimer({ grpc_code });
140+
141+
return (target[prop as any] as Function).apply(target, args);
142+
}
143+
if (grpc_type === "unary" || grpc_type === "client_stream") {
144+
return (async () => {
145+
try {
146+
const promise = await call<Promise<any>>();
147+
const result = await promise;
148+
deferred.resolve();
149+
return result;
150+
} catch (e) {
151+
const err = ConnectError.from(e);
152+
deferred.resolve(e);
153+
throw err;
154+
}
155+
})();
142156
}
157+
return (async function* () {
158+
try {
159+
const generator = await call<AsyncGenerator<any>>();
160+
for await (const item of generator) {
161+
yield item;
162+
}
163+
deferred.resolve();
164+
} catch (e) {
165+
const err = ConnectError.from(e);
166+
deferred.resolve(err);
167+
throw err;
168+
}
169+
})();
143170
};
144171
},
145172
};

0 commit comments

Comments
 (0)