@@ -14,6 +14,9 @@ import { inject, injectable } from "inversify";
14
14
import {
15
15
Disposable ,
16
16
DisposableCollection ,
17
+ PrebuildUpdatesChannel ,
18
+ PrebuildWithStatus ,
19
+ RedisPrebuildUpdate ,
17
20
RedisWorkspaceInstanceUpdate ,
18
21
WorkspaceInstanceUpdatesChannel ,
19
22
} from "@gitpod/gitpod-protocol" ;
@@ -35,11 +38,12 @@ export class RedisSubscriber implements LocalMessageBroker {
35
38
) { }
36
39
37
40
protected workspaceInstanceUpdateListeners : Map < string , WorkspaceInstanceUpdateListener [ ] > = new Map ( ) ;
41
+ protected prebuildUpdateListeners : Map < string , PrebuildUpdateListener [ ] > = new Map ( ) ;
38
42
39
43
protected readonly disposables = new DisposableCollection ( ) ;
40
44
41
45
async start ( ) : Promise < void > {
42
- const channels = [ WorkspaceInstanceUpdatesChannel ] ;
46
+ const channels = [ WorkspaceInstanceUpdatesChannel , PrebuildUpdatesChannel ] ;
43
47
44
48
for ( const chan of channels ) {
45
49
await this . redis . subscribe ( chan ) ;
@@ -65,17 +69,28 @@ export class RedisSubscriber implements LocalMessageBroker {
65
69
private async onMessage ( channel : string , message : string ) : Promise < void > {
66
70
switch ( channel ) {
67
71
case WorkspaceInstanceUpdatesChannel :
68
- const enabled = await this . isRedisPubSubByTypeEnabled ( "workspace-instance" ) ;
69
- if ( ! enabled ) {
72
+ const wsInstanceTypeEnabled = await this . isRedisPubSubByTypeEnabled ( "workspace-instance" ) ;
73
+ if ( ! wsInstanceTypeEnabled ) {
70
74
log . debug ( "[redis] Redis workspace instance update is disabled through feature flag" , {
71
75
channel,
72
76
message,
73
77
} ) ;
74
78
return ;
75
79
}
76
80
77
- const parsed = JSON . parse ( message ) as RedisWorkspaceInstanceUpdate ;
78
- return this . onInstanceUpdate ( parsed ) ;
81
+ return this . onInstanceUpdate ( JSON . parse ( message ) as RedisWorkspaceInstanceUpdate ) ;
82
+
83
+ case PrebuildUpdatesChannel :
84
+ const prebuildTypeEnabled = await this . isRedisPubSubByTypeEnabled ( "prebuild" ) ;
85
+ if ( ! prebuildTypeEnabled ) {
86
+ log . debug ( "[redis] Redis prebuild update is disabled through feature flag" , {
87
+ channel,
88
+ message,
89
+ } ) ;
90
+ return ;
91
+ }
92
+ return this . onPrebuildUpdate ( JSON . parse ( message ) as RedisPrebuildUpdate ) ;
93
+
79
94
default :
80
95
throw new Error ( `Redis Pub/Sub received message on unknown channel: ${ channel } ` ) ;
81
96
}
@@ -112,13 +127,48 @@ export class RedisSubscriber implements LocalMessageBroker {
112
127
}
113
128
}
114
129
130
+ private async onPrebuildUpdate ( update : RedisPrebuildUpdate ) : Promise < void > {
131
+ log . debug ( "[redis] Received prebuild update" , { update } ) ;
132
+
133
+ if ( ! update . projectID || ! update . prebuildID || ! update . status ) {
134
+ return ;
135
+ }
136
+
137
+ const listeners = this . prebuildUpdateListeners . get ( update . projectID ) || [ ] ;
138
+ if ( listeners . length === 0 ) {
139
+ return ;
140
+ }
141
+
142
+ const ctx = { } ;
143
+ const info = ( await this . workspaceDB . findPrebuildInfos ( [ update . prebuildID ] ) ) [ 0 ] ;
144
+ if ( ! info ) {
145
+ return ;
146
+ }
147
+
148
+ const prebuildWithStatus : PrebuildWithStatus = {
149
+ info : info ,
150
+ status : update . status ,
151
+ } ;
152
+
153
+ for ( const l of listeners ) {
154
+ try {
155
+ l ( ctx , prebuildWithStatus ) ;
156
+ } catch ( err ) {
157
+ log . error (
158
+ "Failed to broadcast workspace instance update." ,
159
+ { projectId : update . projectID , instanceId : update . instanceID , workspaceId : update . workspaceID } ,
160
+ err ,
161
+ ) ;
162
+ }
163
+ }
164
+ }
165
+
115
166
async stop ( ) {
116
167
this . disposables . dispose ( ) ;
117
168
}
118
169
119
170
listenForPrebuildUpdates ( projectId : string , listener : PrebuildUpdateListener ) : Disposable {
120
- // TODO: not implemented
121
- return Disposable . create ( ( ) => { } ) ;
171
+ return this . doRegister ( projectId , listener , this . prebuildUpdateListeners ) ;
122
172
}
123
173
124
174
listenForPrebuildUpdatableEvents ( listener : HeadlessWorkspaceEventListener ) : Disposable {
0 commit comments