7
7
import {
8
8
HeadlessWorkspaceEventListener ,
9
9
LocalMessageBroker ,
10
+ LocalRabbitMQBackedMessageBroker ,
10
11
PrebuildUpdateListener ,
11
12
WorkspaceInstanceUpdateListener ,
12
13
} from "./local-message-broker" ;
13
14
import { inject , injectable } from "inversify" ;
14
15
import {
15
16
Disposable ,
16
17
DisposableCollection ,
18
+ HeadlessUpdatesChannel ,
19
+ HeadlessWorkspaceEvent ,
17
20
PrebuildUpdatesChannel ,
18
21
PrebuildWithStatus ,
22
+ RedisHeadlessUpdate ,
19
23
RedisPrebuildUpdate ,
20
24
RedisWorkspaceInstanceUpdate ,
21
25
WorkspaceInstanceUpdatesChannel ,
@@ -39,11 +43,16 @@ export class RedisSubscriber implements LocalMessageBroker {
39
43
40
44
protected workspaceInstanceUpdateListeners : Map < string , WorkspaceInstanceUpdateListener [ ] > = new Map ( ) ;
41
45
protected prebuildUpdateListeners : Map < string , PrebuildUpdateListener [ ] > = new Map ( ) ;
46
+ protected headlessWorkspaceEventListeners : Map < string , HeadlessWorkspaceEventListener [ ] > = new Map ( ) ;
42
47
43
48
protected readonly disposables = new DisposableCollection ( ) ;
44
49
45
50
async start ( ) : Promise < void > {
51
+ < < < << << HEAD
46
52
const channels = [ WorkspaceInstanceUpdatesChannel , PrebuildUpdatesChannel ] ;
53
+ === === =
54
+ const channels = [ WorkspaceInstanceUpdatesChannel , HeadlessUpdatesChannel ] ;
55
+ >>> >>> > 65 ff502c2 ( [ server ] Broadcast headless updates to subscribers )
47
56
48
57
for ( const chan of channels ) {
49
58
await this . redis . subscribe ( chan ) ;
@@ -81,16 +90,27 @@ export class RedisSubscriber implements LocalMessageBroker {
81
90
return this . onInstanceUpdate ( JSON . parse ( message ) as RedisWorkspaceInstanceUpdate ) ;
82
91
83
92
case PrebuildUpdatesChannel :
84
- const headlessUpdateEnabled = await this . isRedisPubSubByTypeEnabled ( "prebuild-updatable " ) ;
85
- if ( ! headlessUpdateEnabled ) {
86
- log . debug ( "[redis] Redis headless update is disabled through feature flag" , {
93
+ const prebuildTypeEnabled = await this . isRedisPubSubByTypeEnabled ( "prebuild" ) ;
94
+ if ( ! prebuildTypeEnabled ) {
95
+ log . debug ( "[redis] Redis prebuild update is disabled through feature flag" , {
87
96
channel,
88
97
message,
89
98
} ) ;
90
99
return ;
91
100
}
92
101
return this . onPrebuildUpdate ( JSON . parse ( message ) as RedisPrebuildUpdate ) ;
93
102
103
+ case HeadlessUpdatesChannel :
104
+ const headlessTypeEnabled = await this . isRedisPubSubByTypeEnabled ( "prebuild-updatable" ) ;
105
+ if ( ! headlessTypeEnabled ) {
106
+ log . debug ( "[redis] Redis headless update is disabled through feature flag" , {
107
+ channel,
108
+ message,
109
+ } ) ;
110
+ return ;
111
+ }
112
+ return this . onHeadlessUpdate ( JSON . parse ( message ) as RedisHeadlessUpdate ) ;
113
+
94
114
default :
95
115
throw new Error ( `Redis Pub/Sub received message on unknown channel: ${ channel } ` ) ;
96
116
}
@@ -163,6 +183,29 @@ export class RedisSubscriber implements LocalMessageBroker {
163
183
}
164
184
}
165
185
186
+ private async onHeadlessUpdate ( update : RedisHeadlessUpdate ) : Promise < void > {
187
+ log . debug ( "[redis] Received prebuild update" , { update } ) ;
188
+
189
+ if ( ! update . type || ! update . workspaceID ) {
190
+ return ;
191
+ }
192
+
193
+ const listeners =
194
+ this . headlessWorkspaceEventListeners . get ( LocalRabbitMQBackedMessageBroker . UNDEFINED_KEY ) || [ ] ;
195
+ if ( listeners . length === 0 ) {
196
+ return ;
197
+ }
198
+
199
+ const ctx = { } ;
200
+ for ( const l of listeners ) {
201
+ try {
202
+ l ( ctx , update ) ;
203
+ } catch ( err ) {
204
+ log . error ( "Failed to broadcast headless update." , update , err ) ;
205
+ }
206
+ }
207
+ }
208
+
166
209
async stop ( ) {
167
210
this . disposables . dispose ( ) ;
168
211
}
@@ -172,8 +215,12 @@ export class RedisSubscriber implements LocalMessageBroker {
172
215
}
173
216
174
217
listenForPrebuildUpdatableEvents ( listener : HeadlessWorkspaceEventListener ) : Disposable {
175
- // TODO: not implemented
176
- return Disposable . create ( ( ) => { } ) ;
218
+ // we're being cheap here in re-using a map where it just needs to be a plain array.
219
+ return this . doRegister (
220
+ LocalRabbitMQBackedMessageBroker . UNDEFINED_KEY ,
221
+ listener ,
222
+ this . headlessWorkspaceEventListeners ,
223
+ ) ;
177
224
}
178
225
179
226
listenForWorkspaceInstanceUpdates ( userId : string , listener : WorkspaceInstanceUpdateListener ) : Disposable {
0 commit comments